diff --git a/tokio/src/fs/read_uring.rs b/tokio/src/fs/read_uring.rs index 5b38c212246..0dc6b3ce6b2 100644 --- a/tokio/src/fs/read_uring.rs +++ b/tokio/src/fs/read_uring.rs @@ -1,5 +1,5 @@ use crate::fs::OpenOptions; -use crate::runtime::driver::op::Op; +use crate::runtime::driver::op::{CqeResult, Op}; use std::io; use std::io::ErrorKind; @@ -11,16 +11,19 @@ use std::path::Path; const PROBE_SIZE: usize = 32; const PROBE_SIZE_U32: u32 = PROBE_SIZE as u32; +// batch size for batched reads +const BATCH_SIZE: usize = 200; + // Max bytes we can read using io uring submission at a time // SAFETY: cannot be higher than u32::MAX for safe cast // Set to read max 64 MiB at time -const MAX_READ_SIZE: usize = 64 * 1024 * 1024; +pub(crate) const MAX_READ_SIZE: usize = 64 * 1024; pub(crate) async fn read_uring(path: &Path) -> io::Result> { let file = OpenOptions::new().read(true).open(path).await?; // TODO: use io uring in the future to obtain metadata - let size_hint: Option = file.metadata().await.map(|m| m.len() as usize).ok(); + let size_hint = file.metadata().await.map(|m| m.len() as usize).ok(); let fd: OwnedFd = file .try_into_std() @@ -33,10 +36,26 @@ pub(crate) async fn read_uring(path: &Path) -> io::Result> { buf.try_reserve(size_hint)?; } - read_to_end_uring(fd, buf).await + read_to_end_batch(fd, buf).await +} + +async fn read_to_end_batch(fd: OwnedFd, buf: Vec) -> io::Result> { + // try reading in batch if we have substantial capacity + if buf.capacity() > MAX_READ_SIZE { + match Op::read_batch_size::(fd, buf).await { + Ok(buf) => Ok(buf), + Err((r_fd, mut r_buf)) => { + // clear the buffer before we write to it from start again + r_buf.clear(); + read_to_end(r_fd, r_buf).await + } + } + } else { + read_to_end(fd, buf).await + } } -async fn read_to_end_uring(mut fd: OwnedFd, mut buf: Vec) -> io::Result> { +async fn read_to_end(mut fd: OwnedFd, mut buf: Vec) -> io::Result> { let mut offset = 0; let start_cap = buf.capacity(); @@ -119,16 +138,20 @@ async fn op_read( let (res, r_fd, r_buf) = Op::read(fd, buf, read_len, *offset).await; match res { - Err(e) if e.kind() == ErrorKind::Interrupted => { - buf = r_buf; - fd = r_fd; - } - Err(e) => return Err(e), - Ok(size_read) => { + CqeResult::Single(Ok(size_read)) => { *offset += size_read as u64; return Ok((r_fd, r_buf, size_read == 0)); } + CqeResult::InitErr(e) | CqeResult::Single(Err(e)) => { + if e.kind() == ErrorKind::Interrupted { + buf = r_buf; + fd = r_fd; + } else { + return Err(e); + } + } + CqeResult::Batch(_) => return Err(ErrorKind::Unsupported.into()), } } } diff --git a/tokio/src/io/uring/open.rs b/tokio/src/io/uring/open.rs index 913588c665c..34c755c51a9 100644 --- a/tokio/src/io/uring/open.rs +++ b/tokio/src/io/uring/open.rs @@ -5,7 +5,7 @@ use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult use io_uring::{opcode, types}; use std::ffi::CString; -use std::io::{self, Error}; +use std::io::{self, ErrorKind}; use std::os::fd::FromRawFd; use std::path::Path; @@ -19,13 +19,13 @@ pub(crate) struct Open { impl Completable for Open { type Output = io::Result; - fn complete(self, cqe: CqeResult) -> Self::Output { - cqe.result - .map(|fd| unsafe { crate::fs::File::from_raw_fd(fd as i32) }) - } - fn complete_with_error(self, err: Error) -> Self::Output { - Err(err) + fn complete(self, cqe: CqeResult) -> Self::Output { + match cqe { + CqeResult::Single(Ok(fd)) => Ok(unsafe { crate::fs::File::from_raw_fd(fd as i32) }), + CqeResult::Batch(_) => Err(ErrorKind::Unsupported.into()), + CqeResult::InitErr(err) | CqeResult::Single(Err(err)) => Err(err), + } } } diff --git a/tokio/src/io/uring/read.rs b/tokio/src/io/uring/read.rs index e8ee633ac07..c6b14b8df79 100644 --- a/tokio/src/io/uring/read.rs +++ b/tokio/src/io/uring/read.rs @@ -1,32 +1,40 @@ -use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op}; +use crate::fs::read_uring::MAX_READ_SIZE; +use crate::runtime::driver::op::{ + i32_to_result, CancelData, Cancellable, Completable, CqeResult, Op, +}; +use io_uring::squeue::{Entry, Flags}; use io_uring::{opcode, types}; -use std::io::{self, Error}; + +use std::io::ErrorKind; +use std::mem::MaybeUninit; use std::os::fd::{AsRawFd, OwnedFd}; +type Output = (CqeResult, OwnedFd, Vec); + #[derive(Debug)] pub(crate) struct Read { fd: OwnedFd, buf: Vec, } -impl Completable for Read { - type Output = (io::Result, OwnedFd, Vec); +impl Completable for Read { + type Output = Output; - fn complete(self, cqe: CqeResult) -> Self::Output { + fn complete(self, cqe: CqeResult) -> Self::Output { let mut buf = self.buf; - if let Ok(len) = cqe.result { + if let CqeResult::Single(Ok(len)) = cqe { + // increase length of buffer on successful + // completion let new_len = buf.len() + len as usize; // SAFETY: Kernel read len bytes unsafe { buf.set_len(new_len) }; } - (cqe.result, self.fd, buf) - } + // Handle rest each batch outside - fn complete_with_error(self, err: Error) -> Self::Output { - (Err(err), self.fd, self.buf) + (cqe, self.fd, buf) } } @@ -58,4 +66,116 @@ impl Op { // SAFETY: Parameters are valid for the entire duration of the operation unsafe { Op::new(read_op, Read { fd, buf }) } } + + // Split file read operations by batches of size N. batches will be executed + // in groups of N. + // + // This function will return the final buffer of bytes in the file + pub(crate) async fn read_batch_size( + mut fd: OwnedFd, + mut buf: Vec, + ) -> Result, (OwnedFd, Vec)> { + // hold batch_size operations and reuse it + let mut n_ops = [const { MaybeUninit::uninit() }; N]; + let mut last_len = 0; + let mut read_len = 0; + let len = buf.capacity(); + + for i in 0..N { + n_ops[i].write(opcode::Nop::new().build()); + } + + let mut n_ops = unsafe { assume_init(n_ops) }; + + // total number of batch entries to read the file completly + for (index, start) in (0..len).step_by(MAX_READ_SIZE).enumerate() { + let end = (start + MAX_READ_SIZE).min(len); // clamp to len for the final chunk + let array_idx = index % N; + + // skip first iteration + if array_idx == 0 && index != 0 { + // SAFETY: Batches are valid array entries + let op = unsafe { Op::batch(n_ops.clone(), Read { fd, buf }) }; + let (_, r_fd, r_buf) = uring_task(op, &mut read_len).await?; + + fd = r_fd; + buf = r_buf; + } else { + let op = opcode::Read::new( + types::Fd(fd.as_raw_fd()), + buf.spare_capacity_mut()[start..].as_mut_ptr().cast(), + (end - start) as u32, + ) + .offset(start as u64) + .build() + // link our sqes so cqes arrive in order + .flags(Flags::IO_LINK); + + n_ops[array_idx] = op; + } + + last_len = array_idx; // save last array_idx for last batch + } + + // Handle last partial batch if there is any + if last_len > 0 { + for (i, entry) in n_ops.iter_mut().enumerate() { + // no op the double counted entries + if i > last_len { + *entry = opcode::Nop::new().build(); + } + } + + // SAFETY: Because of the no-op loop above, we can assume double entry + // read is not possible + let op = unsafe { Op::batch(n_ops, Read { fd, buf }) }; + + let (_, _, r_buf) = uring_task(op, &mut read_len).await?; + + buf = r_buf; + } + + unsafe { + buf.set_len(buf.len() + read_len); + } + + Ok(buf) + } +} + +unsafe fn assume_init(n_ops: [MaybeUninit; N]) -> [Entry; N] { + unsafe { std::mem::transmute_copy(&n_ops) } +} + +// Poll the batch operation and get the Output out of it +async fn uring_task( + op: Op, + read_len: &mut usize, +) -> Result, (OwnedFd, Vec)> { + let (res, r_fd, r_buf) = op.await; + + match res { + CqeResult::Batch(cqes) => { + for cqe in cqes { + let cqe = i32_to_result(cqe); + + match cqe { + Ok(r_size) => match read_len.checked_add(r_size as usize) { + Some(len) => { + *read_len = len; + } + None => return Err((r_fd, r_buf)), + }, + Err(e) => { + if e.kind() != ErrorKind::Interrupted { + return Err((r_fd, r_buf)); + } + } + } + } + } + _ => return Err((r_fd, r_buf)), + }; + + Ok((res, r_fd, r_buf)) } diff --git a/tokio/src/io/uring/write.rs b/tokio/src/io/uring/write.rs index 7341f7622da..bf1ccc638fa 100644 --- a/tokio/src/io/uring/write.rs +++ b/tokio/src/io/uring/write.rs @@ -2,7 +2,7 @@ use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult use crate::util::as_ref::OwnedBuf; use io_uring::{opcode, types}; -use std::io::{self, Error}; +use std::io::{self, ErrorKind}; use std::os::fd::{AsRawFd, OwnedFd}; #[derive(Debug)] @@ -13,12 +13,15 @@ pub(crate) struct Write { impl Completable for Write { type Output = (io::Result, OwnedBuf, OwnedFd); + fn complete(self, cqe: CqeResult) -> Self::Output { - (cqe.result, self.buf, self.fd) - } + let res = match cqe { + CqeResult::Single(cqe) => cqe, + CqeResult::Batch(_) => Err(ErrorKind::Unsupported.into()), + CqeResult::InitErr(err) => Err(err), + }; - fn complete_with_error(self, err: Error) -> Self::Output { - (Err(err), self.buf, self.fd) + (res, self.buf, self.fd) } } diff --git a/tokio/src/runtime/driver/op.rs b/tokio/src/runtime/driver/op.rs index d2b9289ceee..2175ea3cf2c 100644 --- a/tokio/src/runtime/driver/op.rs +++ b/tokio/src/runtime/driver/op.rs @@ -3,14 +3,16 @@ use crate::io::uring::read::Read; use crate::io::uring::write::Write; use crate::runtime::Handle; -use io_uring::cqueue; -use io_uring::squeue::Entry; +use io_uring::{cqueue, squeue}; use std::future::Future; -use std::io::{self, Error}; +use std::io::{self}; use std::mem; use std::pin::Pin; +use std::sync::Arc; use std::task::{Context, Poll, Waker}; +const DEFAULT_BATCH_SIZE: usize = u8::MAX as usize; + // This field isn't accessed directly, but it holds cancellation data, // so `#[allow(dead_code)]` is needed. #[allow(dead_code)] @@ -34,91 +36,115 @@ pub(crate) enum Lifecycle { Cancelled( // This field isn't accessed directly, but it holds cancellation data, // so `#[allow(dead_code)]` is needed. - #[allow(dead_code)] CancelData, + #[allow(dead_code)] Arc, ), /// The operation has completed with a single cqe result Completed(io_uring::cqueue::Entry), } -pub(crate) enum State { - Initialize(Option), +/// `const` parameter is only used for Batch +pub(crate) enum State { + // Single operation state + Initialize(Option), Polled(usize), + // Batch operation state + InitializeBatch([squeue::Entry; N]), + PolledBatch([usize; N]), + // Batch or single operation is completed Complete, } -pub(crate) struct Op { +pub(crate) struct Op { // Handle to the runtime handle: Handle, // State of this Op - state: State, + state: State, // Per operation data. data: Option, + // Completed CQEs stored for checking batch completion + completed: [i32; N], } -impl Op { +impl Op { /// # Safety /// /// Callers must ensure that parameters of the entry (such as buffer) are valid and will /// be valid for the entire duration of the operation, otherwise it may cause memory problems. - pub(crate) unsafe fn new(entry: Entry, data: T) -> Self { + pub(crate) unsafe fn new(entry: squeue::Entry, data: T) -> Self { let handle = Handle::current(); + Self { handle, data: Some(data), state: State::Initialize(Some(entry)), + completed: [0; N], + } + } + + pub(crate) unsafe fn batch(entries: [squeue::Entry; N], data: T) -> Self { + let handle = Handle::current(); + + Self { + handle, + data: Some(data), + state: State::InitializeBatch(entries), + completed: [0; N], } } + pub(crate) fn take_data(&mut self) -> Option { self.data.take() } } -impl Drop for Op { +impl Drop for Op { fn drop(&mut self) { match self.state { // We've already dropped this Op. - State::Complete => (), + State::Complete => return, + // This Op has not been polled yet. + // We don't need to do anything here. + State::Initialize(_) | State::InitializeBatch(_) => return, + _ => (), + } + + let data = self.take_data(); + let handle = &mut self.handle; + + match &self.state { // We will cancel this Op. State::Polled(index) => { - let data = self.take_data(); - let handle = &mut self.handle; - handle.inner.driver().io().cancel_op(index, data); + handle.inner.driver().io().cancel_op(*index, data); } - // This Op has not been polled yet. - // We don't need to do anything here. - State::Initialize(_) => (), + State::PolledBatch(ids) => { + handle.inner.driver().io().cancel_batched_op(ids, data); + } + _ => (), } } } -/// A single CQE result -pub(crate) struct CqeResult { - pub(crate) result: io::Result, +/// Result of an completed operation +pub(crate) enum CqeResult { + Single(io::Result), + Batch([i32; N]), + // This is used when you want to terminate an operation with an error. + InitErr(io::Error), } -impl From for CqeResult { +impl From for CqeResult { fn from(cqe: cqueue::Entry) -> Self { - let res = cqe.result(); - let result = if res >= 0 { - Ok(res as u32) - } else { - Err(io::Error::from_raw_os_error(-res)) - }; - CqeResult { result } + CqeResult::Single(cqe_to_result(cqe)) } } /// A trait that converts a CQE result into a usable value for each operation. -pub(crate) trait Completable { +pub(crate) trait Completable { type Output; - fn complete(self, cqe: CqeResult) -> Self::Output; - // This is used when you want to terminate an operation with an error. - // - // The `Op` type that implements this trait can return the passed error - // upstream by embedding it in the `Output`. - fn complete_with_error(self, error: Error) -> Self::Output; + // Called when a single or batch operation is completed + fn complete(self, res: CqeResult) -> Self::Output; } /// Extracts the `CancelData` needed to safely cancel an in-flight io_uring operation. @@ -126,17 +152,18 @@ pub(crate) trait Cancellable { fn cancel(self) -> CancelData; } -impl Unpin for Op {} +impl Unpin for Op {} -impl Future for Op { +impl + Send, const N: usize> Future for Op { type Output = T::Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); let handle = &mut this.handle; let driver = handle.inner.driver().io(); + let state = &mut this.state; - match &mut this.state { + match state { State::Initialize(entry_opt) => { let entry = entry_opt.take().expect("Entry must be present"); let waker = cx.waker().clone(); @@ -151,13 +178,87 @@ impl Future for Op { this.state = State::Complete; - return Poll::Ready(data.complete_with_error(err)); + return Poll::Ready(data.complete(CqeResult::InitErr(err))); + } + }; + + Poll::Pending + } + + State::InitializeBatch(entries) => { + let waker = cx.waker().clone(); + + // SAFETY: entry is valid for the entire duration of the operation + match unsafe { driver.register_batch(entries, waker) } { + Ok(ids) => this.state = State::PolledBatch(ids), + Err(err) => { + let data = this + .take_data() + .expect("Data must be present on Initalization"); + + this.state = State::Complete; + + return Poll::Ready(data.complete(CqeResult::InitErr(err))); } }; Poll::Pending } + State::PolledBatch(ids) => { + let mut ctx = driver.get_uring().lock(); + let completed = &mut this.completed; + + for (i, idx) in ids.iter().enumerate() { + let lifecycle = if let Some(lifecycle) = ctx.ops.get_mut(*idx) { + lifecycle + } else { + continue; + }; + + match mem::replace(lifecycle, Lifecycle::Submitted) { + // Only replace the stored waker if it wouldn't wake the new one + Lifecycle::Waiting(prev) if !prev.will_wake(cx.waker()) => { + let waker = cx.waker().clone(); + *lifecycle = Lifecycle::Waiting(waker); + } + + Lifecycle::Waiting(prev) => { + *lifecycle = Lifecycle::Waiting(prev); + } + + Lifecycle::Completed(cqe) => { + // Clean up and complete the future + ctx.remove_op(*idx); + // SAFETY: ids array and completed array of size N + // it is 0oed in the initialization + *unsafe { completed.get_mut(i).unwrap_unchecked() } = cqe.result(); + } + + Lifecycle::Submitted => { + unreachable!("Submitted lifecycle should never be seen here"); + } + + Lifecycle::Cancelled(_) => { + unreachable!("Cancelled lifecycle should never be seen here"); + } + } + } + + if ctx.check_slab_entry(&*ids) { + this.state = State::Complete; + drop(ctx); + + let data = this + .take_data() + .expect("Data must be present on completion"); + + return Poll::Ready(data.complete(CqeResult::Batch(this.completed))); + } + + Poll::Pending + } + State::Polled(idx) => { let mut ctx = driver.get_uring().lock(); let lifecycle = ctx.ops.get_mut(*idx).expect("Lifecycle must be present"); @@ -186,7 +287,8 @@ impl Future for Op { let data = this .take_data() .expect("Data must be present on completion"); - Poll::Ready(data.complete(cqe.into())) + + Poll::Ready(data.complete(CqeResult::Single(cqe_to_result(cqe)))) } Lifecycle::Submitted => { @@ -205,3 +307,17 @@ impl Future for Op { } } } + +pub(crate) fn cqe_to_result(cqe: cqueue::Entry) -> io::Result { + let res = cqe.result(); + + i32_to_result(res) +} + +pub(crate) fn i32_to_result(res: i32) -> std::io::Result { + if res >= 0 { + Ok(res as u32) + } else { + Err(std::io::Error::from_raw_os_error(-res)) + } +} diff --git a/tokio/src/runtime/io/driver/uring.rs b/tokio/src/runtime/io/driver/uring.rs index 89c97826bdf..b5359a10017 100644 --- a/tokio/src/runtime/io/driver/uring.rs +++ b/tokio/src/runtime/io/driver/uring.rs @@ -1,14 +1,17 @@ -use io_uring::{squeue::Entry, IoUring, Probe}; +use io_uring::squeue::Entry; +use io_uring::{IoUring, Probe}; use mio::unix::SourceFd; use slab::Slab; -use crate::runtime::driver::op::{Cancellable, Lifecycle}; -use crate::{io::Interest, loom::sync::Mutex}; - use super::{Handle, TOKEN_WAKEUP}; +use crate::io::Interest; +use crate::loom::sync::Mutex; +use crate::runtime::driver::op::{Cancellable, Lifecycle}; use std::os::fd::{AsRawFd, RawFd}; -use std::{io, mem, task::Waker}; +use std::sync::Arc; +use std::task::Waker; +use std::{io, mem}; const DEFAULT_RING_SIZE: u32 = 256; @@ -59,7 +62,6 @@ impl UringContext { Ok(true) } - pub(crate) fn dispatch_completions(&mut self) { let ops = &mut self.ops; let Some(mut uring) = self.uring.take() else { @@ -116,6 +118,11 @@ impl UringContext { } } + // check if the specified range of slab indexes exist or not + pub(crate) fn check_slab_entry(&self, indexes: &[usize]) -> bool { + indexes.iter().all(|i| self.ops.get(*i).is_none()) + } + pub(crate) fn remove_op(&mut self, index: usize) -> Lifecycle { self.ops.remove(index) } @@ -210,6 +217,60 @@ impl Handle { Ok(()) } + // Register batch operations with io-uring, the returned value is the + // list of indexes. + // + // If the IO_LINK flag is not set the completions may not arrive in order + // for ordering set the IO_LINK flag before passing the entires to this function + // + // This function returns indexes in the slab for each Entry, to ensure all entries + // passed are completed, check against the slab for the indexes returned. + pub(crate) unsafe fn register_batch( + &self, + entries: &mut [Entry; N], + waker: Waker, + ) -> io::Result<[usize; N]> { + assert!(self.uring_probe.initialized()); + + let mut guard = self.get_uring().lock(); + let ctx = &mut *guard; + + let mut indexes = [0; N]; + + for (i, entry) in entries.iter_mut().enumerate() { + let index = ctx.ops.insert(Lifecycle::Waiting(waker.clone())); + entry.set_user_data(index as u64); + indexes[i] = index; + } + + let submit_or_remove = |ctx: &mut UringContext| -> io::Result<()> { + if let Err(e) = ctx.submit() { + // Submission failed, remove the entry from the slab and return the error + for i in indexes.iter() { + ctx.ops.remove(*i); + } + + return Err(e); + } + Ok(()) + }; + + // SAFETY: entry is valid for the entire duration of the operation + while unsafe { ctx.ring_mut().submission().push_multiple(entries).is_err() } { + // If the submission queue is full, flush it to the kernel + submit_or_remove(ctx)?; + } + + // Ensure that the completion queue is not full before submitting the entry. + while ctx.ring_mut().completion().is_full() { + ctx.dispatch_completions(); + } + + submit_or_remove(ctx)?; + + Ok(indexes) + } + /// Register an operation with the io_uring. /// /// If this is the first io_uring operation, it will also initialize the io_uring context. @@ -269,7 +330,7 @@ impl Handle { // uring data alive until the operation completes. let cancel_data = data.expect("Data should be present").cancel(); - match mem::replace(lifecycle, Lifecycle::Cancelled(cancel_data)) { + match mem::replace(lifecycle, Lifecycle::Cancelled(Arc::new(cancel_data))) { Lifecycle::Submitted | Lifecycle::Waiting(_) => (), // The driver saw the completion, but it was never polled. Lifecycle::Completed(_) => { @@ -279,4 +340,27 @@ impl Handle { prev => panic!("Unexpected state: {prev:?}"), }; } + + pub(crate) fn cancel_batched_op(&self, indexes: &[usize], data: Option) { + let mut guard = self.get_uring().lock(); + let ctx = &mut *guard; + let ops = &mut ctx.ops; + + let cancel_data = data.expect("Data should be present").cancel(); + let cancel_rc = Arc::new(cancel_data); + + for &i in indexes.iter() { + if let Some(lifecycle) = ops.get_mut(i) { + match mem::replace(lifecycle, Lifecycle::Cancelled(Arc::clone(&cancel_rc))) { + Lifecycle::Submitted | Lifecycle::Waiting(_) => (), + // The driver saw the completion, but it was never polled. + Lifecycle::Completed(_) => { + // We can safely remove the entry from the slab, as it has already been completed. + ops.remove(i); + } + prev => panic!("Unexpected state: {prev:?}"), + }; + } + } + } }