diff --git a/tokio/src/fs/create_dir.rs b/tokio/src/fs/create_dir.rs index dbc97eb6033..eb256b1328b 100644 --- a/tokio/src/fs/create_dir.rs +++ b/tokio/src/fs/create_dir.rs @@ -46,5 +46,21 @@ use std::path::Path; /// ``` pub async fn create_dir(path: impl AsRef) -> io::Result<()> { let path = path.as_ref().to_owned(); + + #[cfg(all( + tokio_unstable, + feature = "io-uring", + feature = "rt", + feature = "fs", + target_os = "linux" + ))] + { + let handle = crate::runtime::Handle::current(); + let driver_handle = handle.inner.driver().io(); + if driver_handle.check_and_init(io_uring::opcode::MkDirAt::CODE)? { + return crate::runtime::driver::op::Op::mkdir(&path)?.await; + } + } + asyncify(move || std::fs::create_dir(path)).await } diff --git a/tokio/src/fs/create_dir_all.rs b/tokio/src/fs/create_dir_all.rs index 1256469b500..3b5ebb35f98 100644 --- a/tokio/src/fs/create_dir_all.rs +++ b/tokio/src/fs/create_dir_all.rs @@ -47,5 +47,23 @@ use std::path::Path; /// ``` pub async fn create_dir_all(path: impl AsRef) -> io::Result<()> { let path = path.as_ref().to_owned(); + + #[cfg(all( + tokio_unstable, + feature = "io-uring", + feature = "rt", + feature = "fs", + target_os = "linux" + ))] + { + use crate::fs::create_dir_all_uring; + + let handle = crate::runtime::Handle::current(); + let driver_handle = handle.inner.driver().io(); + if driver_handle.check_and_init(io_uring::opcode::MkDirAt::CODE)? { + return create_dir_all_uring(&path).await; + } + } + asyncify(move || std::fs::create_dir_all(path)).await } diff --git a/tokio/src/fs/create_dir_all_uring.rs b/tokio/src/fs/create_dir_all_uring.rs new file mode 100644 index 00000000000..1b5bd6649d1 --- /dev/null +++ b/tokio/src/fs/create_dir_all_uring.rs @@ -0,0 +1,90 @@ +#[cfg(all( + tokio_unstable, + feature = "io-uring", + feature = "rt", + feature = "fs", + target_os = "linux" +))] +use crate::runtime::driver::op::Op; +use std::ffi::OsStr; +use std::io; +use std::os::unix::ffi::OsStrExt; +use std::path::Path; + +pub(crate) async fn create_dir_all_uring(path: &Path) -> io::Result<()> { + if path == Path::new("") { + return Ok(()); + } + + // First, check if the path exists. + if mkdir_parent_missing(path).await? { + return Ok(()); + } + + // Otherwise, we must create its parents. + // For /a/b/c/d, we must first try /a/b/c, /a/b, /a, and /. + // Hence, we can iterate over the positions of / in reverse, + // finding the first / that appears after a directory that already exists. + // + // For example, suppose /a exists, but none of its children. + // The creation of /a/b will be successful. + // Hence, first_valid_separator_pos = 4. + let mut first_valid_separator_pos = None; + let path_bytes = path.as_os_str().as_bytes(); + for (separator_pos, _) in path_bytes + .iter() + .enumerate() + .rev() + .filter(|(_, &byte)| byte == b'/') + { + let parent_bytes = &path_bytes[..separator_pos]; + let parent_str = OsStr::from_bytes(parent_bytes); + let parent_path = Path::new(parent_str); + if mkdir_parent_missing(parent_path).await? { + first_valid_separator_pos = Some(separator_pos); + break; + } + } + let first_valid_separator_pos = first_valid_separator_pos.unwrap_or(0); + + // Once we have found the correct /, + // we can iterate the remaining components in the forward direction. + // + // In the example /a/b/c/d path, there is only one remaining /, after c. + // Hence, we first create /a/b/c. + // + // TODO: We're attempting to create all directories sequentially. + // This would benefit from batching. + for (separator_pos, _) in path_bytes + .iter() + .enumerate() + .skip(first_valid_separator_pos + 1) + .filter(|(_, &byte)| byte == b'/') + { + let parent_path = Path::new(OsStr::from_bytes(&path_bytes[..separator_pos])); + mkdir_parent_created(parent_path).await?; + } + + // We must finally create the last path (/a/b/c/d in our example). + mkdir_parent_created(path).await?; + Ok(()) +} + +async fn mkdir_parent_missing(path: &Path) -> io::Result { + match Op::mkdir(path)?.await { + Ok(()) => Ok(true), + Err(ref e) if e.kind() == io::ErrorKind::NotFound => Ok(false), + // TODO: replace with uring-based statx + Err(_) if crate::fs::metadata(path).await?.is_dir() => Ok(true), + Err(e) => Err(e), + } +} + +async fn mkdir_parent_created(path: &Path) -> io::Result<()> { + match Op::mkdir(path)?.await { + Ok(()) => Ok(()), + // TODO: replace with uring-based statx + Err(_) if crate::fs::metadata(path).await?.is_dir() => Ok(()), + Err(e) => Err(e), + } +} diff --git a/tokio/src/fs/mod.rs b/tokio/src/fs/mod.rs index 8701d3a1083..6a817f1de52 100644 --- a/tokio/src/fs/mod.rs +++ b/tokio/src/fs/mod.rs @@ -296,7 +296,9 @@ cfg_windows! { } cfg_io_uring! { + pub(crate) mod create_dir_all_uring; pub(crate) mod read_uring; + pub(crate) use self::create_dir_all_uring::create_dir_all_uring; pub(crate) use self::read_uring::read_uring; pub(crate) use self::open_options::UringOpenOptions; diff --git a/tokio/src/io/uring/mkdir.rs b/tokio/src/io/uring/mkdir.rs new file mode 100644 index 00000000000..ee4f1f7c323 --- /dev/null +++ b/tokio/src/io/uring/mkdir.rs @@ -0,0 +1,49 @@ +use super::utils::cstr; + +use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op}; + +use io_uring::{opcode, types}; +use std::ffi::CString; +use std::io; +use std::io::Error; +use std::path::Path; + +#[derive(Debug)] +pub(crate) struct Mkdir { + /// This field will be read by the kernel during the operation, so we + /// need to ensure it is valid for the entire duration of the operation. + #[allow(dead_code)] + path: CString, +} + +impl Completable for Mkdir { + type Output = io::Result<()>; + + fn complete(self, cqe: CqeResult) -> Self::Output { + cqe.result.map(|_| ()) + } + + fn complete_with_error(self, err: Error) -> Self::Output { + Err(err) + } +} + +impl Cancellable for Mkdir { + fn cancel(self) -> CancelData { + CancelData::Mkdir(self) + } +} + +impl Op { + /// Submit a request to create a directory. + pub(crate) fn mkdir(path: &Path) -> io::Result { + let path = cstr(path)?; + + let mkdir_op = opcode::MkDirAt::new(types::Fd(libc::AT_FDCWD), path.as_ptr()) + .mode(0o777) + .build(); + + // SAFETY: Parameters are valid for the entire duration of the operation + Ok(unsafe { Op::new(mkdir_op, Mkdir { path }) }) + } +} diff --git a/tokio/src/io/uring/mod.rs b/tokio/src/io/uring/mod.rs index facad596f63..52754bfc8b0 100644 --- a/tokio/src/io/uring/mod.rs +++ b/tokio/src/io/uring/mod.rs @@ -1,3 +1,4 @@ +pub(crate) mod mkdir; pub(crate) mod open; pub(crate) mod read; pub(crate) mod utils; diff --git a/tokio/src/runtime/driver/op.rs b/tokio/src/runtime/driver/op.rs index d2b9289ceee..04eb957966a 100644 --- a/tokio/src/runtime/driver/op.rs +++ b/tokio/src/runtime/driver/op.rs @@ -1,3 +1,4 @@ +use crate::io::uring::mkdir::Mkdir; use crate::io::uring::open::Open; use crate::io::uring::read::Read; use crate::io::uring::write::Write; @@ -16,6 +17,7 @@ use std::task::{Context, Poll, Waker}; #[allow(dead_code)] #[derive(Debug)] pub(crate) enum CancelData { + Mkdir(Mkdir), Open(Open), Write(Write), Read(Read), diff --git a/tokio/tests/fs_uring_mkdir.rs b/tokio/tests/fs_uring_mkdir.rs new file mode 100644 index 00000000000..9d1e37e3d97 --- /dev/null +++ b/tokio/tests/fs_uring_mkdir.rs @@ -0,0 +1,168 @@ +//! Uring mkdir operations tests. + +#![cfg(all( + tokio_unstable, + feature = "io-uring", + feature = "rt", + feature = "fs", + target_os = "linux" +))] + +use futures::FutureExt; +use std::future::poll_fn; +use std::future::Future; +use std::pin::pin; +use std::sync::mpsc; +use std::task::Poll; +use std::time::Duration; +use tokio::runtime::{Builder, Runtime}; +use tokio_test::assert_pending; +use tokio_util::task::TaskTracker; + +fn multi_rt(n: usize) -> Box Runtime> { + Box::new(move || { + Builder::new_multi_thread() + .worker_threads(n) + .enable_all() + .build() + .unwrap() + }) +} + +fn current_rt() -> Box Runtime> { + Box::new(|| Builder::new_current_thread().enable_all().build().unwrap()) +} + +fn rt_combinations() -> Vec Runtime>> { + vec![ + current_rt(), + multi_rt(1), + multi_rt(2), + multi_rt(8), + multi_rt(64), + multi_rt(256), + ] +} + +#[test] +fn shutdown_runtime_while_performing_io_uring_ops() { + fn run(rt: Runtime) { + let (done_tx, done_rx) = mpsc::channel(); + let workdir = tempfile::tempdir().unwrap(); + + rt.spawn(async move { + // spawning a bunch of uring operations. + for i in 0..usize::MAX { + let child = workdir.path().join(format!("{i}")); + tokio::spawn(async move { + let mut fut = pin!(tokio::fs::create_dir(&child)); + + poll_fn(|cx| { + assert_pending!(fut.as_mut().poll(cx)); + Poll::<()>::Pending + }) + .await; + + fut.await.unwrap(); + }); + + // Avoid busy looping. + tokio::task::yield_now().await; + } + }); + + std::thread::spawn(move || { + rt.shutdown_timeout(Duration::from_millis(300)); + done_tx.send(()).unwrap(); + }); + + done_rx.recv().unwrap(); + } + + for rt in rt_combinations() { + run(rt()); + } +} + +#[test] +fn create_many_directories() { + fn run(rt: Runtime) { + let workdir = tempfile::tempdir().unwrap(); + + rt.block_on(async move { + const N_CHILDREN: usize = 10_000; + + let tracker = TaskTracker::new(); + + for i in 0..N_CHILDREN { + let child = workdir.path().join(format!("{i}")); + tracker.spawn(async move { + tokio::fs::create_dir(&child).await.unwrap(); + }); + } + tracker.close(); + tracker.wait().await; + + let mut dir_iter = tokio::fs::read_dir(workdir.path()).await.unwrap(); + let mut child_count = 0; + while dir_iter.next_entry().await.unwrap().is_some() { + child_count += 1; + } + assert_eq!(child_count, N_CHILDREN); + }); + } + + for rt in rt_combinations() { + run(rt()); + } +} + +#[tokio::test] +async fn create_dir_all_edge_cases() { + let workdir = tempfile::tempdir().unwrap(); + let workdir_path = workdir.path(); + + tokio::fs::create_dir_all(workdir.path()).await.unwrap(); + + let nested_path = workdir_path.join("foo").join("bar"); + tokio::fs::create_dir_all(&nested_path).await.unwrap(); + assert!(nested_path.is_dir()); + tokio::fs::create_dir_all(nested_path.parent().unwrap()) + .await + .unwrap(); + tokio::fs::create_dir_all(&nested_path).await.unwrap(); + + let slash_trailing = workdir_path.join("./baz/qux//"); + tokio::fs::create_dir_all(&slash_trailing).await.unwrap(); + assert!(slash_trailing.is_dir()); +} + +#[tokio::test] +async fn cancel_op_future() { + let workdir = tempfile::tempdir().unwrap(); + + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let handle = tokio::spawn(async move { + poll_fn(|cx| { + let child = workdir.path().join("child"); + let fut = tokio::fs::create_dir(&child); + + // If io_uring is enabled (and not falling back to the thread pool), + // the first poll should return Pending. + let _pending = pin!(fut).poll_unpin(cx); + + tx.send(()).unwrap(); + + Poll::<()>::Pending + }) + .await; + }); + + // Wait for the first poll + rx.recv().await.unwrap(); + + handle.abort(); + + let res = handle.await.unwrap_err(); + assert!(res.is_cancelled()); +}