From b55a8ee9714a84ff864d9ac6f32420fa4c1b9218 Mon Sep 17 00:00:00 2001 From: Daksh <41485688+Daksh14@users.noreply.github.com> Date: Fri, 17 Oct 2025 12:51:01 +0530 Subject: [PATCH 1/4] io-uring: Implement batching mechanism --- tokio/src/fs/read_uring.rs | 57 +++++++-- tokio/src/io/uring/open.rs | 14 +-- tokio/src/io/uring/read.rs | 53 ++++++-- tokio/src/io/uring/write.rs | 13 +- tokio/src/runtime/driver/op.rs | 174 ++++++++++++++++++++++----- tokio/src/runtime/io/driver/uring.rs | 94 +++++++++++++-- tokio/tests/fs_uring_read.rs | 15 +++ 7 files changed, 349 insertions(+), 71 deletions(-) diff --git a/tokio/src/fs/read_uring.rs b/tokio/src/fs/read_uring.rs index 5b38c212246..ecd77415635 100644 --- a/tokio/src/fs/read_uring.rs +++ b/tokio/src/fs/read_uring.rs @@ -1,5 +1,5 @@ use crate::fs::OpenOptions; -use crate::runtime::driver::op::Op; +use crate::runtime::driver::op::{CqeResult, Op}; use std::io; use std::io::ErrorKind; @@ -14,13 +14,13 @@ const PROBE_SIZE_U32: u32 = PROBE_SIZE as u32; // Max bytes we can read using io uring submission at a time // SAFETY: cannot be higher than u32::MAX for safe cast // Set to read max 64 MiB at time -const MAX_READ_SIZE: usize = 64 * 1024 * 1024; +pub(crate) const MAX_READ_SIZE: usize = 64 * 1024 * 1024; pub(crate) async fn read_uring(path: &Path) -> io::Result> { let file = OpenOptions::new().read(true).open(path).await?; // TODO: use io uring in the future to obtain metadata - let size_hint: Option = file.metadata().await.map(|m| m.len() as usize).ok(); + let size_hint = file.metadata().await.map(|m| m.len() as usize).ok(); let fd: OwnedFd = file .try_into_std() @@ -31,12 +31,43 @@ pub(crate) async fn read_uring(path: &Path) -> io::Result> { if let Some(size_hint) = size_hint { buf.try_reserve(size_hint)?; + + println!("{:?}", size_hint); } - read_to_end_uring(fd, buf).await + read_to_end_batch(fd, buf).await } -async fn read_to_end_uring(mut fd: OwnedFd, mut buf: Vec) -> io::Result> { +async fn read_to_end_batch(fd: OwnedFd, mut buf: Vec) -> io::Result> { + let file_len = buf.capacity(); + + if file_len > MAX_READ_SIZE { + let (res, r_fd, mut r_buf) = Op::read_batch(fd, buf, file_len).await; + + if let CqeResult::Batch(cqes) = res { + let mut written_len = 0; + + for cqe in cqes { + if let Ok(entry) = cqe { + written_len += entry as usize; + if entry != MAX_READ_SIZE as u32 { + println!("short read"); + } + } else { + println!("{:?}", "err"); + } + } + + unsafe { r_buf.set_len(written_len) } + } + + Ok(r_buf) + } else { + read_to_end(fd, buf).await + } +} + +async fn read_to_end(mut fd: OwnedFd, mut buf: Vec) -> io::Result> { let mut offset = 0; let start_cap = buf.capacity(); @@ -119,16 +150,20 @@ async fn op_read( let (res, r_fd, r_buf) = Op::read(fd, buf, read_len, *offset).await; match res { - Err(e) if e.kind() == ErrorKind::Interrupted => { - buf = r_buf; - fd = r_fd; - } - Err(e) => return Err(e), - Ok(size_read) => { + CqeResult::Single(Ok(size_read)) => { *offset += size_read as u64; return Ok((r_fd, r_buf, size_read == 0)); } + CqeResult::InitErr(e) | CqeResult::Single(Err(e)) => { + if e.kind() == ErrorKind::Interrupted { + buf = r_buf; + fd = r_fd; + } else { + return Err(e); + } + } + CqeResult::Batch(_) => return Err(ErrorKind::Unsupported.into()), } } } diff --git a/tokio/src/io/uring/open.rs b/tokio/src/io/uring/open.rs index 913588c665c..34c755c51a9 100644 --- a/tokio/src/io/uring/open.rs +++ b/tokio/src/io/uring/open.rs @@ -5,7 +5,7 @@ use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult use io_uring::{opcode, types}; use std::ffi::CString; -use std::io::{self, Error}; +use std::io::{self, ErrorKind}; use std::os::fd::FromRawFd; use std::path::Path; @@ -19,13 +19,13 @@ pub(crate) struct Open { impl Completable for Open { type Output = io::Result; - fn complete(self, cqe: CqeResult) -> Self::Output { - cqe.result - .map(|fd| unsafe { crate::fs::File::from_raw_fd(fd as i32) }) - } - fn complete_with_error(self, err: Error) -> Self::Output { - Err(err) + fn complete(self, cqe: CqeResult) -> Self::Output { + match cqe { + CqeResult::Single(Ok(fd)) => Ok(unsafe { crate::fs::File::from_raw_fd(fd as i32) }), + CqeResult::Batch(_) => Err(ErrorKind::Unsupported.into()), + CqeResult::InitErr(err) | CqeResult::Single(Err(err)) => Err(err), + } } } diff --git a/tokio/src/io/uring/read.rs b/tokio/src/io/uring/read.rs index e8ee633ac07..2b2a18e0780 100644 --- a/tokio/src/io/uring/read.rs +++ b/tokio/src/io/uring/read.rs @@ -1,7 +1,8 @@ +use crate::fs::read_uring::MAX_READ_SIZE; use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op}; +use io_uring::squeue::Flags; use io_uring::{opcode, types}; -use std::io::{self, Error}; use std::os::fd::{AsRawFd, OwnedFd}; #[derive(Debug)] @@ -11,22 +12,23 @@ pub(crate) struct Read { } impl Completable for Read { - type Output = (io::Result, OwnedFd, Vec); + type Output = (CqeResult, OwnedFd, Vec); fn complete(self, cqe: CqeResult) -> Self::Output { let mut buf = self.buf; - if let Ok(len) = cqe.result { - let new_len = buf.len() + len as usize; - // SAFETY: Kernel read len bytes - unsafe { buf.set_len(new_len) }; + match cqe { + // increase length of buffer on successful + // completion + CqeResult::Single(Ok(len)) => { + let new_len = buf.len() + len as usize; + // SAFETY: Kernel read len bytes + unsafe { buf.set_len(new_len) }; + } + _ => (), } - (cqe.result, self.fd, buf) - } - - fn complete_with_error(self, err: Error) -> Self::Output { - (Err(err), self.fd, self.buf) + (cqe, self.fd, buf) } } @@ -58,4 +60,33 @@ impl Op { // SAFETY: Parameters are valid for the entire duration of the operation unsafe { Op::new(read_op, Read { fd, buf }) } } + + // Submit batch requests to read a FD, the function splits reads by MAX_READ_SIZE + // and returns a list of CQEs which can be used to determine if read was successful + // or not + pub(crate) fn read_batch(fd: OwnedFd, mut buf: Vec, len: usize) -> Self { + let entries = { + // total number of batch entries to read the file completly + let mut batch_entries = Vec::new(); + + for start in (0..len).step_by(MAX_READ_SIZE) { + let end = (start + MAX_READ_SIZE).min(len); // clamp to len for the final chunk + // MAX_READ_SIZE is less than u32 + let len = (end - start) as u32; + + let buf_mut_ptr = buf.spare_capacity_mut().as_mut_ptr().cast(); + + let op = opcode::Read::new(types::Fd(fd.as_raw_fd()), buf_mut_ptr, len) + .offset(start as u64) + .build() + .flags(Flags::IO_LINK); + + batch_entries.push(op); + } + + batch_entries + }; + + unsafe { Op::batch(entries, Read { fd, buf }) } + } } diff --git a/tokio/src/io/uring/write.rs b/tokio/src/io/uring/write.rs index 7341f7622da..bf1ccc638fa 100644 --- a/tokio/src/io/uring/write.rs +++ b/tokio/src/io/uring/write.rs @@ -2,7 +2,7 @@ use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult use crate::util::as_ref::OwnedBuf; use io_uring::{opcode, types}; -use std::io::{self, Error}; +use std::io::{self, ErrorKind}; use std::os::fd::{AsRawFd, OwnedFd}; #[derive(Debug)] @@ -13,12 +13,15 @@ pub(crate) struct Write { impl Completable for Write { type Output = (io::Result, OwnedBuf, OwnedFd); + fn complete(self, cqe: CqeResult) -> Self::Output { - (cqe.result, self.buf, self.fd) - } + let res = match cqe { + CqeResult::Single(cqe) => cqe, + CqeResult::Batch(_) => Err(ErrorKind::Unsupported.into()), + CqeResult::InitErr(err) => Err(err), + }; - fn complete_with_error(self, err: Error) -> Self::Output { - (Err(err), self.buf, self.fd) + (res, self.buf, self.fd) } } diff --git a/tokio/src/runtime/driver/op.rs b/tokio/src/runtime/driver/op.rs index d2b9289ceee..4138aaab512 100644 --- a/tokio/src/runtime/driver/op.rs +++ b/tokio/src/runtime/driver/op.rs @@ -3,12 +3,12 @@ use crate::io::uring::read::Read; use crate::io::uring::write::Write; use crate::runtime::Handle; -use io_uring::cqueue; -use io_uring::squeue::Entry; +use io_uring::{cqueue, squeue}; use std::future::Future; -use std::io::{self, Error}; +use std::io::{self}; use std::mem; use std::pin::Pin; +use std::sync::Arc; use std::task::{Context, Poll, Waker}; // This field isn't accessed directly, but it holds cancellation data, @@ -34,7 +34,7 @@ pub(crate) enum Lifecycle { Cancelled( // This field isn't accessed directly, but it holds cancellation data, // so `#[allow(dead_code)]` is needed. - #[allow(dead_code)] CancelData, + #[allow(dead_code)] Arc, ), /// The operation has completed with a single cqe result @@ -42,8 +42,13 @@ pub(crate) enum Lifecycle { } pub(crate) enum State { - Initialize(Option), + // Single operation state + Initialize(Option), Polled(usize), + // Batch operation state + InitalizeBatch(Vec), + PolledBatch(Vec), + // Batch or single operation is completed Complete, } @@ -54,6 +59,10 @@ pub(crate) struct Op { state: State, // Per operation data. data: Option, + // indexes of slab of registered operation + indexes: Vec, + // Completed CQEs stored for checking batch completion + completed: Vec>, } impl Op { @@ -61,14 +70,30 @@ impl Op { /// /// Callers must ensure that parameters of the entry (such as buffer) are valid and will /// be valid for the entire duration of the operation, otherwise it may cause memory problems. - pub(crate) unsafe fn new(entry: Entry, data: T) -> Self { + pub(crate) unsafe fn new(entry: squeue::Entry, data: T) -> Self { let handle = Handle::current(); + Self { handle, data: Some(data), state: State::Initialize(Some(entry)), + indexes: Vec::new(), + completed: Vec::new(), } } + + pub(crate) unsafe fn batch(entries: Vec, data: T) -> Self { + let handle = Handle::current(); + + Self { + handle, + data: Some(data), + state: State::InitalizeBatch(entries), + indexes: Vec::new(), + completed: Vec::new(), + } + } + pub(crate) fn take_data(&mut self) -> Option { self.data.take() } @@ -78,47 +103,49 @@ impl Drop for Op { fn drop(&mut self) { match self.state { // We've already dropped this Op. - State::Complete => (), + State::Complete => return, + // This Op has not been polled yet. + // We don't need to do anything here. + State::Initialize(_) | State::InitalizeBatch(_) => return, + _ => (), + } + + let data = self.take_data(); + let handle = &mut self.handle; + + match &self.state { // We will cancel this Op. State::Polled(index) => { - let data = self.take_data(); - let handle = &mut self.handle; - handle.inner.driver().io().cancel_op(index, data); + handle.inner.driver().io().cancel_op(*index, data); } - // This Op has not been polled yet. - // We don't need to do anything here. - State::Initialize(_) => (), + State::PolledBatch(ids) => { + handle.inner.driver().io().cancel_batched_op(ids, data); + } + _ => (), } } } -/// A single CQE result -pub(crate) struct CqeResult { - pub(crate) result: io::Result, +/// Result of an completed operation +pub(crate) enum CqeResult { + Single(io::Result), + Batch(Vec>), + // This is used when you want to terminate an operation with an error. + InitErr(io::Error), } impl From for CqeResult { fn from(cqe: cqueue::Entry) -> Self { - let res = cqe.result(); - let result = if res >= 0 { - Ok(res as u32) - } else { - Err(io::Error::from_raw_os_error(-res)) - }; - CqeResult { result } + CqeResult::Single(cqe_to_result(cqe)) } } /// A trait that converts a CQE result into a usable value for each operation. pub(crate) trait Completable { type Output; - fn complete(self, cqe: CqeResult) -> Self::Output; - // This is used when you want to terminate an operation with an error. - // - // The `Op` type that implements this trait can return the passed error - // upstream by embedding it in the `Output`. - fn complete_with_error(self, error: Error) -> Self::Output; + // Called when a single or batch operation is completed + fn complete(self, res: CqeResult) -> Self::Output; } /// Extracts the `CancelData` needed to safely cancel an in-flight io_uring operation. @@ -151,13 +178,91 @@ impl Future for Op { this.state = State::Complete; - return Poll::Ready(data.complete_with_error(err)); + return Poll::Ready(data.complete(CqeResult::InitErr(err))); } }; Poll::Pending } + State::InitalizeBatch(entries) => { + let waker = cx.waker().clone(); + + // SAFETY: entry is valid for the entire duration of the operation + match unsafe { driver.register_batch(entries, waker) } { + Ok(ids) => { + this.indexes = ids.clone(); + this.state = State::PolledBatch(ids) + } + Err(err) => { + let data = this + .take_data() + .expect("Data must be present on Initalization"); + + this.state = State::Complete; + + return Poll::Ready(data.complete(CqeResult::InitErr(err))); + } + }; + + Poll::Pending + } + + State::PolledBatch(ids) => { + let mut ctx = driver.get_uring().lock(); + let completed = &mut this.completed; + + for idx in ids.iter() { + let lifecycle = if let Some(lifecycle) = ctx.ops.get_mut(*idx) { + lifecycle + } else { + continue; + }; + + match mem::replace(lifecycle, Lifecycle::Submitted) { + // Only replace the stored waker if it wouldn't wake the new one + Lifecycle::Waiting(prev) if !prev.will_wake(cx.waker()) => { + let waker = cx.waker().clone(); + *lifecycle = Lifecycle::Waiting(waker); + } + + Lifecycle::Waiting(prev) => { + *lifecycle = Lifecycle::Waiting(prev); + } + + Lifecycle::Completed(cqe) => { + // Clean up and complete the future + ctx.remove_op(*idx); + completed.push(cqe_to_result(cqe)); + } + + Lifecycle::Submitted => { + unreachable!("Submitted lifecycle should never be seen here"); + } + + Lifecycle::Cancelled(_) => { + unreachable!("Cancelled lifecycle should never be seen here"); + } + } + } + + if ctx.check_slab_entry(ids) { + this.state = State::Complete; + drop(ctx); + + let cqes = &mut this.completed; + let completed = std::mem::take(cqes); + + let data = this + .take_data() + .expect("Data must be present on completion"); + + return Poll::Ready(data.complete(CqeResult::Batch(completed))); + } + + Poll::Pending + } + State::Polled(idx) => { let mut ctx = driver.get_uring().lock(); let lifecycle = ctx.ops.get_mut(*idx).expect("Lifecycle must be present"); @@ -205,3 +310,12 @@ impl Future for Op { } } } + +fn cqe_to_result(cqe: cqueue::Entry) -> io::Result { + let res = cqe.result(); + if res >= 0 { + Ok(res as u32) + } else { + Err(io::Error::from_raw_os_error(-res)) + } +} diff --git a/tokio/src/runtime/io/driver/uring.rs b/tokio/src/runtime/io/driver/uring.rs index 89c97826bdf..b5076b7eed8 100644 --- a/tokio/src/runtime/io/driver/uring.rs +++ b/tokio/src/runtime/io/driver/uring.rs @@ -1,14 +1,18 @@ -use io_uring::{squeue::Entry, IoUring, Probe}; +use io_uring::squeue::Entry; +use io_uring::IoUring; use mio::unix::SourceFd; use slab::Slab; -use crate::runtime::driver::op::{Cancellable, Lifecycle}; -use crate::{io::Interest, loom::sync::Mutex}; - use super::{Handle, TOKEN_WAKEUP}; +use crate::io::Interest; +use crate::loom::sync::atomic::Ordering; +use crate::loom::sync::Mutex; +use crate::runtime::driver::op::{Cancellable, Lifecycle}; use std::os::fd::{AsRawFd, RawFd}; -use std::{io, mem, task::Waker}; +use std::sync::Arc; +use std::task::Waker; +use std::{io, mem}; const DEFAULT_RING_SIZE: u32 = 256; @@ -59,7 +63,6 @@ impl UringContext { Ok(true) } - pub(crate) fn dispatch_completions(&mut self) { let ops = &mut self.ops; let Some(mut uring) = self.uring.take() else { @@ -116,6 +119,11 @@ impl UringContext { } } + // check if the specified range of slab indexes exist or not + pub(crate) fn check_slab_entry(&self, indexes: &[usize]) -> bool { + indexes.iter().all(|i| self.ops.get(*i).is_none()) + } + pub(crate) fn remove_op(&mut self, index: usize) -> Lifecycle { self.ops.remove(index) } @@ -210,6 +218,55 @@ impl Handle { Ok(()) } + // Register batch operations with io-uring, the returned value is the + // list of indexes. + // + // If the IO_LINK flag is not set the completions may not arrive in order + // for ordering set the IO_LINK flag before passing the entires to this function + pub(crate) unsafe fn register_batch( + &self, + entries: &mut [Entry], + waker: Waker, + ) -> io::Result> { + if !self.check_and_init()? { + return Err(io::Error::from_raw_os_error(libc::ENOSYS)); + } + + let mut guard = self.get_uring().lock(); + let ctx = &mut *guard; + + let length = entries.len(); + let mut indexes: Vec = Vec::with_capacity(length); + + for entry in entries.iter_mut() { + let index = ctx.ops.insert(Lifecycle::Waiting(waker.clone())); + entry.set_user_data(index as u64); + indexes.push(index); + } + + let submit_or_remove = |ctx: &mut UringContext| -> io::Result<()> { + if let Err(e) = ctx.submit() { + // Submission failed, remove the entry from the slab and return the error + for i in indexes.iter() { + ctx.ops.remove(*i); + } + + return Err(e); + } + Ok(()) + }; + + // SAFETY: entry is valid for the entire duration of the operation + while unsafe { ctx.ring_mut().submission().push_multiple(entries).is_err() } { + // If the submission queue is full, flush it to the kernel + submit_or_remove(ctx)?; + } + + submit_or_remove(ctx)?; + + Ok(indexes) + } + /// Register an operation with the io_uring. /// /// If this is the first io_uring operation, it will also initialize the io_uring context. @@ -269,7 +326,7 @@ impl Handle { // uring data alive until the operation completes. let cancel_data = data.expect("Data should be present").cancel(); - match mem::replace(lifecycle, Lifecycle::Cancelled(cancel_data)) { + match mem::replace(lifecycle, Lifecycle::Cancelled(Arc::new(cancel_data))) { Lifecycle::Submitted | Lifecycle::Waiting(_) => (), // The driver saw the completion, but it was never polled. Lifecycle::Completed(_) => { @@ -279,4 +336,27 @@ impl Handle { prev => panic!("Unexpected state: {prev:?}"), }; } + + pub(crate) fn cancel_batched_op(&self, indexes: &[usize], data: Option) { + let mut guard = self.get_uring().lock(); + let ctx = &mut *guard; + let ops = &mut ctx.ops; + + let cancel_data = data.expect("Data should be present").cancel(); + let cancel_rc = Arc::new(cancel_data); + + for &i in indexes.iter() { + if let Some(lifecycle) = ops.get_mut(i) { + match mem::replace(lifecycle, Lifecycle::Cancelled(Arc::clone(&cancel_rc))) { + Lifecycle::Submitted | Lifecycle::Waiting(_) => (), + // The driver saw the completion, but it was never polled. + Lifecycle::Completed(_) => { + // We can safely remove the entry from the slab, as it has already been completed. + ops.remove(i); + } + prev => panic!("Unexpected state: {prev:?}"), + }; + } + } + } } diff --git a/tokio/tests/fs_uring_read.rs b/tokio/tests/fs_uring_read.rs index 4ce6d454ab6..7eb4a889d27 100644 --- a/tokio/tests/fs_uring_read.rs +++ b/tokio/tests/fs_uring_read.rs @@ -131,6 +131,21 @@ async fn read_small_large_files() { assert_eq!(bytes, create_buf(20)); } +#[tokio::test] +async fn read_small_large_files() { + let (_tmp, path) = create_large_temp_file(); + + let bytes = read(path).await.unwrap(); + + assert_eq!(bytes, create_buf(5000)); + + let (_tmp, path) = create_small_temp_file(); + + let bytes = read(path).await.unwrap(); + + assert_eq!(bytes, create_buf(20)); +} + #[tokio::test] async fn cancel_op_future() { let (_tmp_file, path): (Vec, Vec) = create_tmp_files(1); From 6a94ff4613026f9aef801db7df6b0d7da1b0f68f Mon Sep 17 00:00:00 2001 From: Daksh <41485688+Daksh14@users.noreply.github.com> Date: Wed, 31 Dec 2025 18:19:29 +0530 Subject: [PATCH 2/4] io-uring: Re-think batch read implementation Use fixed size batch sizes of 128 for testing --- tokio/src/fs/read_uring.rs | 28 ++------- tokio/src/io/uring/read.rs | 89 +++++++++++++++++++++------- tokio/src/runtime/driver/op.rs | 8 +-- tokio/src/runtime/io/driver/uring.rs | 5 ++ tokio/tests/fs_uring_read.rs | 15 ----- 5 files changed, 82 insertions(+), 63 deletions(-) diff --git a/tokio/src/fs/read_uring.rs b/tokio/src/fs/read_uring.rs index ecd77415635..6d55082ed44 100644 --- a/tokio/src/fs/read_uring.rs +++ b/tokio/src/fs/read_uring.rs @@ -31,37 +31,21 @@ pub(crate) async fn read_uring(path: &Path) -> io::Result> { if let Some(size_hint) = size_hint { buf.try_reserve(size_hint)?; - - println!("{:?}", size_hint); } read_to_end_batch(fd, buf).await } -async fn read_to_end_batch(fd: OwnedFd, mut buf: Vec) -> io::Result> { +async fn read_to_end_batch(fd: OwnedFd, buf: Vec) -> io::Result> { let file_len = buf.capacity(); + let batch_size = 128; + // try reading in batch if we have substantial capacity if file_len > MAX_READ_SIZE { - let (res, r_fd, mut r_buf) = Op::read_batch(fd, buf, file_len).await; - - if let CqeResult::Batch(cqes) = res { - let mut written_len = 0; - - for cqe in cqes { - if let Ok(entry) = cqe { - written_len += entry as usize; - if entry != MAX_READ_SIZE as u32 { - println!("short read"); - } - } else { - println!("{:?}", "err"); - } - } - - unsafe { r_buf.set_len(written_len) } + match Op::read_batch_size(fd, buf, file_len, batch_size).await { + Ok(buf) => Ok(buf), + Err((r_fd, r_buf)) => read_to_end(r_fd, r_buf).await, } - - Ok(r_buf) } else { read_to_end(fd, buf).await } diff --git a/tokio/src/io/uring/read.rs b/tokio/src/io/uring/read.rs index 2b2a18e0780..c19b949a43e 100644 --- a/tokio/src/io/uring/read.rs +++ b/tokio/src/io/uring/read.rs @@ -1,8 +1,9 @@ use crate::fs::read_uring::MAX_READ_SIZE; use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op}; -use io_uring::squeue::Flags; +use io_uring::squeue::{Entry, Flags}; use io_uring::{opcode, types}; +use std::io::ErrorKind; use std::os::fd::{AsRawFd, OwnedFd}; #[derive(Debug)] @@ -17,17 +18,16 @@ impl Completable for Read { fn complete(self, cqe: CqeResult) -> Self::Output { let mut buf = self.buf; - match cqe { + if let CqeResult::Single(Ok(len)) = cqe { // increase length of buffer on successful // completion - CqeResult::Single(Ok(len)) => { - let new_len = buf.len() + len as usize; - // SAFETY: Kernel read len bytes - unsafe { buf.set_len(new_len) }; - } - _ => (), + let new_len = buf.len() + len as usize; + // SAFETY: Kernel read len bytes + unsafe { buf.set_len(new_len) }; } + // Handle rest each batch outside + (cqe, self.fd, buf) } } @@ -61,32 +61,77 @@ impl Op { unsafe { Op::new(read_op, Read { fd, buf }) } } - // Submit batch requests to read a FD, the function splits reads by MAX_READ_SIZE - // and returns a list of CQEs which can be used to determine if read was successful - // or not - pub(crate) fn read_batch(fd: OwnedFd, mut buf: Vec, len: usize) -> Self { - let entries = { - // total number of batch entries to read the file completly - let mut batch_entries = Vec::new(); - - for start in (0..len).step_by(MAX_READ_SIZE) { + // Split file read operations by batches of size batch_size. batches will be executed in groupts + // of batch_size + pub(crate) async fn read_batch_size( + mut fd: OwnedFd, + mut buf: Vec, + len: usize, + batch_size: usize, + ) -> Result, (OwnedFd, Vec)> { + // hold multiple >= batch_size length vectors of operations + let mut batches_of_ops = Vec::new(); + // hold batch_size operations and reuse it + let mut n_ops = Vec::with_capacity(size_of::() * batch_size); + + // total number of batch entries to read the file completly + for (index, start) in (0..len).step_by(MAX_READ_SIZE).enumerate() { + if (index + 1) % batch_size == 0 { + batches_of_ops.push(n_ops.clone()); + n_ops.clear(); + } else { let end = (start + MAX_READ_SIZE).min(len); // clamp to len for the final chunk // MAX_READ_SIZE is less than u32 let len = (end - start) as u32; - let buf_mut_ptr = buf.spare_capacity_mut().as_mut_ptr().cast(); + let buf_mut_ptr = buf.spare_capacity_mut()[start..].as_mut_ptr().cast(); let op = opcode::Read::new(types::Fd(fd.as_raw_fd()), buf_mut_ptr, len) .offset(start as u64) .build() .flags(Flags::IO_LINK); - batch_entries.push(op); + n_ops.push(op); } + } - batch_entries - }; + // push out the last batches + batches_of_ops.push(n_ops.clone()); + + let mut read_size = 0; + + for batches in batches_of_ops { + let op = unsafe { Op::batch(batches, Read { fd, buf }) }; + // TODO: Maybe we can put this in a tokio task + let (res, r_fd, r_buf) = op.await; + + match res { + CqeResult::Batch(cqes) => { + for cqe in cqes { + match cqe { + Ok(r_size) => { + read_size += r_size as usize; + } + Err(e) => { + if e.kind() != ErrorKind::Interrupted { + return Err((r_fd, r_buf)); + } + } + } + } + } + _ => return Err((r_fd, r_buf)), + }; + + fd = r_fd; + buf = r_buf; + } + + // SAFETY: We have read `read_size` amount + unsafe { + buf.set_len(buf.len() + read_size); + } - unsafe { Op::batch(entries, Read { fd, buf }) } + Ok(buf) } } diff --git a/tokio/src/runtime/driver/op.rs b/tokio/src/runtime/driver/op.rs index 4138aaab512..eea283b0492 100644 --- a/tokio/src/runtime/driver/op.rs +++ b/tokio/src/runtime/driver/op.rs @@ -46,7 +46,7 @@ pub(crate) enum State { Initialize(Option), Polled(usize), // Batch operation state - InitalizeBatch(Vec), + InitializeBatch(Vec), PolledBatch(Vec), // Batch or single operation is completed Complete, @@ -88,7 +88,7 @@ impl Op { Self { handle, data: Some(data), - state: State::InitalizeBatch(entries), + state: State::InitializeBatch(entries), indexes: Vec::new(), completed: Vec::new(), } @@ -106,7 +106,7 @@ impl Drop for Op { State::Complete => return, // This Op has not been polled yet. // We don't need to do anything here. - State::Initialize(_) | State::InitalizeBatch(_) => return, + State::Initialize(_) | State::InitializeBatch(_) => return, _ => (), } @@ -185,7 +185,7 @@ impl Future for Op { Poll::Pending } - State::InitalizeBatch(entries) => { + State::InitializeBatch(entries) => { let waker = cx.waker().clone(); // SAFETY: entry is valid for the entire duration of the operation diff --git a/tokio/src/runtime/io/driver/uring.rs b/tokio/src/runtime/io/driver/uring.rs index b5076b7eed8..bc89d6aba10 100644 --- a/tokio/src/runtime/io/driver/uring.rs +++ b/tokio/src/runtime/io/driver/uring.rs @@ -262,6 +262,11 @@ impl Handle { submit_or_remove(ctx)?; } + // Ensure that the completion queue is not full before submitting the entry. + while ctx.ring_mut().completion().is_full() { + ctx.dispatch_completions(); + } + submit_or_remove(ctx)?; Ok(indexes) diff --git a/tokio/tests/fs_uring_read.rs b/tokio/tests/fs_uring_read.rs index 7eb4a889d27..4ce6d454ab6 100644 --- a/tokio/tests/fs_uring_read.rs +++ b/tokio/tests/fs_uring_read.rs @@ -131,21 +131,6 @@ async fn read_small_large_files() { assert_eq!(bytes, create_buf(20)); } -#[tokio::test] -async fn read_small_large_files() { - let (_tmp, path) = create_large_temp_file(); - - let bytes = read(path).await.unwrap(); - - assert_eq!(bytes, create_buf(5000)); - - let (_tmp, path) = create_small_temp_file(); - - let bytes = read(path).await.unwrap(); - - assert_eq!(bytes, create_buf(20)); -} - #[tokio::test] async fn cancel_op_future() { let (_tmp_file, path): (Vec, Vec) = create_tmp_files(1); From 4e692409e73d9434ff9a10576c4338a266ff6fda Mon Sep 17 00:00:00 2001 From: Daksh <41485688+Daksh14@users.noreply.github.com> Date: Fri, 2 Jan 2026 21:51:16 +0530 Subject: [PATCH 3/4] io-uring: use const generics for batch_size Clear buffer if error and read_to_end Use single cqe_to_result function --- tokio/src/fs/read_uring.rs | 12 ++- tokio/src/io/uring/read.rs | 142 ++++++++++++++++++--------- tokio/src/runtime/driver/op.rs | 76 +++++++------- tokio/src/runtime/io/driver/uring.rs | 23 +++-- 4 files changed, 152 insertions(+), 101 deletions(-) diff --git a/tokio/src/fs/read_uring.rs b/tokio/src/fs/read_uring.rs index 6d55082ed44..c51909020d3 100644 --- a/tokio/src/fs/read_uring.rs +++ b/tokio/src/fs/read_uring.rs @@ -11,6 +11,9 @@ use std::path::Path; const PROBE_SIZE: usize = 32; const PROBE_SIZE_U32: u32 = PROBE_SIZE as u32; +// batch size for batched reads +const BATCH_SIZE: usize = 128; + // Max bytes we can read using io uring submission at a time // SAFETY: cannot be higher than u32::MAX for safe cast // Set to read max 64 MiB at time @@ -38,13 +41,16 @@ pub(crate) async fn read_uring(path: &Path) -> io::Result> { async fn read_to_end_batch(fd: OwnedFd, buf: Vec) -> io::Result> { let file_len = buf.capacity(); - let batch_size = 128; // try reading in batch if we have substantial capacity if file_len > MAX_READ_SIZE { - match Op::read_batch_size(fd, buf, file_len, batch_size).await { + match Op::read_batch_size::(fd, buf, file_len).await { Ok(buf) => Ok(buf), - Err((r_fd, r_buf)) => read_to_end(r_fd, r_buf).await, + Err((r_fd, mut r_buf)) => { + // clear the buffer before we write to it from start again + r_buf.clear(); + read_to_end(r_fd, r_buf).await + } } } else { read_to_end(fd, buf).await diff --git a/tokio/src/io/uring/read.rs b/tokio/src/io/uring/read.rs index c19b949a43e..345bb31e082 100644 --- a/tokio/src/io/uring/read.rs +++ b/tokio/src/io/uring/read.rs @@ -1,21 +1,27 @@ use crate::fs::read_uring::MAX_READ_SIZE; -use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op}; +use crate::runtime::driver::op::{ + i32_to_result, CancelData, Cancellable, Completable, CqeResult, Op, +}; use io_uring::squeue::{Entry, Flags}; use io_uring::{opcode, types}; + use std::io::ErrorKind; +use std::mem::MaybeUninit; use std::os::fd::{AsRawFd, OwnedFd}; +type Output = (CqeResult, OwnedFd, Vec); + #[derive(Debug)] pub(crate) struct Read { fd: OwnedFd, buf: Vec, } -impl Completable for Read { - type Output = (CqeResult, OwnedFd, Vec); +impl Completable for Read { + type Output = Output; - fn complete(self, cqe: CqeResult) -> Self::Output { + fn complete(self, cqe: CqeResult) -> Self::Output { let mut buf = self.buf; if let CqeResult::Single(Ok(len)) = cqe { @@ -61,77 +67,115 @@ impl Op { unsafe { Op::new(read_op, Read { fd, buf }) } } - // Split file read operations by batches of size batch_size. batches will be executed in groupts - // of batch_size - pub(crate) async fn read_batch_size( + // Split file read operations by batches of size N. batches will be executed in groups + // of N. + // This function will return + pub(crate) async fn read_batch_size( mut fd: OwnedFd, mut buf: Vec, len: usize, - batch_size: usize, ) -> Result, (OwnedFd, Vec)> { - // hold multiple >= batch_size length vectors of operations - let mut batches_of_ops = Vec::new(); + let mut batches_of_ops = Vec::with_capacity(len / (N * MAX_READ_SIZE)); // hold batch_size operations and reuse it - let mut n_ops = Vec::with_capacity(size_of::() * batch_size); + let mut n_ops = [const { MaybeUninit::uninit() }; N]; + let mut last_len = 0; // total number of batch entries to read the file completly for (index, start) in (0..len).step_by(MAX_READ_SIZE).enumerate() { - if (index + 1) % batch_size == 0 { - batches_of_ops.push(n_ops.clone()); - n_ops.clear(); + // push a chunk into the batches + if (index + 1) % N == 0 { + // SAFETY: the index + 1 divides N so we have written N ops. + // We can transmute those into a array of sqe entries + let entries: [Entry; N] = unsafe { std::mem::transmute_copy(&n_ops) }; + + batches_of_ops.push(entries); } else { let end = (start + MAX_READ_SIZE).min(len); // clamp to len for the final chunk - // MAX_READ_SIZE is less than u32 let len = (end - start) as u32; - let buf_mut_ptr = buf.spare_capacity_mut()[start..].as_mut_ptr().cast(); - - let op = opcode::Read::new(types::Fd(fd.as_raw_fd()), buf_mut_ptr, len) + n_ops[index % N].write( + opcode::Read::new( + types::Fd(fd.as_raw_fd()), + buf.spare_capacity_mut()[start..].as_mut_ptr().cast(), + len, + ) .offset(start as u64) .build() - .flags(Flags::IO_LINK); - - n_ops.push(op); + // link our sqes so cqes arrive in order + .flags(Flags::IO_LINK), + ); } - } - - // push out the last batches - batches_of_ops.push(n_ops.clone()); - let mut read_size = 0; + last_len = index % N; // save last len for last batch + } + // Handle all full batches for batches in batches_of_ops { + // SAFETY: Batches are valid array entries let op = unsafe { Op::batch(batches, Read { fd, buf }) }; - // TODO: Maybe we can put this in a tokio task - let (res, r_fd, r_buf) = op.await; - - match res { - CqeResult::Batch(cqes) => { - for cqe in cqes { - match cqe { - Ok(r_size) => { - read_size += r_size as usize; - } - Err(e) => { - if e.kind() != ErrorKind::Interrupted { - return Err((r_fd, r_buf)); - } - } - } - } - } - _ => return Err((r_fd, r_buf)), - }; + let (_, r_fd, r_buf) = uring_task(op).await?; fd = r_fd; buf = r_buf; } - // SAFETY: We have read `read_size` amount - unsafe { - buf.set_len(buf.len() + read_size); + // Handle last partial batch if there is any + if last_len > 0 { + // SAFETY: This should be safe as we will always have valid Entries + // in the array at any time. We will slice it by `last_len` to avoid + // double counting / wrong cqe. + let mut entries: [Entry; N] = unsafe { std::mem::transmute_copy(&n_ops) }; + + for (i, entry) in entries.iter_mut().enumerate() { + if i > last_len { + *entry = opcode::Nop::new().build(); + } + } + + // SAFETY: Because of the loop above, the entries that are repeated + // or invalided have been Nop'ed + let (_, _, r_buf) = uring_task(unsafe { Op::batch(entries, Read { fd, buf }) }).await?; + + buf = r_buf; } Ok(buf) } } + +// Poll the batch operation and get the Output out of it +async fn uring_task(op: Op) -> Result, (OwnedFd, Vec)> { + // TODO: Maybe we can put this in a tokio task + let (res, r_fd, mut r_buf) = op.await; + + match res { + CqeResult::Batch(cqes) => { + for cqe in cqes { + let cqe = i32_to_result(cqe); + + match cqe { + Ok(r_size) => { + // SAFETY: We have read `r_size` amount + let len = if let Some(new_len) = r_buf.len().checked_add(r_size as usize) { + new_len + } else { + return Err((r_fd, r_buf)); + }; + + unsafe { + r_buf.set_len(len); + } + } + Err(e) => { + if e.kind() != ErrorKind::Interrupted { + return Err((r_fd, r_buf)); + } + } + } + } + } + _ => return Err((r_fd, r_buf)), + }; + + Ok((res, r_fd, r_buf)) +} diff --git a/tokio/src/runtime/driver/op.rs b/tokio/src/runtime/driver/op.rs index eea283b0492..2175ea3cf2c 100644 --- a/tokio/src/runtime/driver/op.rs +++ b/tokio/src/runtime/driver/op.rs @@ -11,6 +11,8 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll, Waker}; +const DEFAULT_BATCH_SIZE: usize = u8::MAX as usize; + // This field isn't accessed directly, but it holds cancellation data, // so `#[allow(dead_code)]` is needed. #[allow(dead_code)] @@ -41,31 +43,30 @@ pub(crate) enum Lifecycle { Completed(io_uring::cqueue::Entry), } -pub(crate) enum State { +/// `const` parameter is only used for Batch +pub(crate) enum State { // Single operation state Initialize(Option), Polled(usize), // Batch operation state - InitializeBatch(Vec), - PolledBatch(Vec), + InitializeBatch([squeue::Entry; N]), + PolledBatch([usize; N]), // Batch or single operation is completed Complete, } -pub(crate) struct Op { +pub(crate) struct Op { // Handle to the runtime handle: Handle, // State of this Op - state: State, + state: State, // Per operation data. data: Option, - // indexes of slab of registered operation - indexes: Vec, // Completed CQEs stored for checking batch completion - completed: Vec>, + completed: [i32; N], } -impl Op { +impl Op { /// # Safety /// /// Callers must ensure that parameters of the entry (such as buffer) are valid and will @@ -77,20 +78,18 @@ impl Op { handle, data: Some(data), state: State::Initialize(Some(entry)), - indexes: Vec::new(), - completed: Vec::new(), + completed: [0; N], } } - pub(crate) unsafe fn batch(entries: Vec, data: T) -> Self { + pub(crate) unsafe fn batch(entries: [squeue::Entry; N], data: T) -> Self { let handle = Handle::current(); Self { handle, data: Some(data), state: State::InitializeBatch(entries), - indexes: Vec::new(), - completed: Vec::new(), + completed: [0; N], } } @@ -99,7 +98,7 @@ impl Op { } } -impl Drop for Op { +impl Drop for Op { fn drop(&mut self) { match self.state { // We've already dropped this Op. @@ -127,25 +126,25 @@ impl Drop for Op { } /// Result of an completed operation -pub(crate) enum CqeResult { +pub(crate) enum CqeResult { Single(io::Result), - Batch(Vec>), + Batch([i32; N]), // This is used when you want to terminate an operation with an error. InitErr(io::Error), } -impl From for CqeResult { +impl From for CqeResult { fn from(cqe: cqueue::Entry) -> Self { CqeResult::Single(cqe_to_result(cqe)) } } /// A trait that converts a CQE result into a usable value for each operation. -pub(crate) trait Completable { +pub(crate) trait Completable { type Output; // Called when a single or batch operation is completed - fn complete(self, res: CqeResult) -> Self::Output; + fn complete(self, res: CqeResult) -> Self::Output; } /// Extracts the `CancelData` needed to safely cancel an in-flight io_uring operation. @@ -153,17 +152,18 @@ pub(crate) trait Cancellable { fn cancel(self) -> CancelData; } -impl Unpin for Op {} +impl Unpin for Op {} -impl Future for Op { +impl + Send, const N: usize> Future for Op { type Output = T::Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); let handle = &mut this.handle; let driver = handle.inner.driver().io(); + let state = &mut this.state; - match &mut this.state { + match state { State::Initialize(entry_opt) => { let entry = entry_opt.take().expect("Entry must be present"); let waker = cx.waker().clone(); @@ -190,10 +190,7 @@ impl Future for Op { // SAFETY: entry is valid for the entire duration of the operation match unsafe { driver.register_batch(entries, waker) } { - Ok(ids) => { - this.indexes = ids.clone(); - this.state = State::PolledBatch(ids) - } + Ok(ids) => this.state = State::PolledBatch(ids), Err(err) => { let data = this .take_data() @@ -212,7 +209,7 @@ impl Future for Op { let mut ctx = driver.get_uring().lock(); let completed = &mut this.completed; - for idx in ids.iter() { + for (i, idx) in ids.iter().enumerate() { let lifecycle = if let Some(lifecycle) = ctx.ops.get_mut(*idx) { lifecycle } else { @@ -233,7 +230,9 @@ impl Future for Op { Lifecycle::Completed(cqe) => { // Clean up and complete the future ctx.remove_op(*idx); - completed.push(cqe_to_result(cqe)); + // SAFETY: ids array and completed array of size N + // it is 0oed in the initialization + *unsafe { completed.get_mut(i).unwrap_unchecked() } = cqe.result(); } Lifecycle::Submitted => { @@ -246,18 +245,15 @@ impl Future for Op { } } - if ctx.check_slab_entry(ids) { + if ctx.check_slab_entry(&*ids) { this.state = State::Complete; drop(ctx); - let cqes = &mut this.completed; - let completed = std::mem::take(cqes); - let data = this .take_data() .expect("Data must be present on completion"); - return Poll::Ready(data.complete(CqeResult::Batch(completed))); + return Poll::Ready(data.complete(CqeResult::Batch(this.completed))); } Poll::Pending @@ -291,7 +287,8 @@ impl Future for Op { let data = this .take_data() .expect("Data must be present on completion"); - Poll::Ready(data.complete(cqe.into())) + + Poll::Ready(data.complete(CqeResult::Single(cqe_to_result(cqe)))) } Lifecycle::Submitted => { @@ -311,11 +308,16 @@ impl Future for Op { } } -fn cqe_to_result(cqe: cqueue::Entry) -> io::Result { +pub(crate) fn cqe_to_result(cqe: cqueue::Entry) -> io::Result { let res = cqe.result(); + + i32_to_result(res) +} + +pub(crate) fn i32_to_result(res: i32) -> std::io::Result { if res >= 0 { Ok(res as u32) } else { - Err(io::Error::from_raw_os_error(-res)) + Err(std::io::Error::from_raw_os_error(-res)) } } diff --git a/tokio/src/runtime/io/driver/uring.rs b/tokio/src/runtime/io/driver/uring.rs index bc89d6aba10..b5359a10017 100644 --- a/tokio/src/runtime/io/driver/uring.rs +++ b/tokio/src/runtime/io/driver/uring.rs @@ -1,11 +1,10 @@ use io_uring::squeue::Entry; -use io_uring::IoUring; +use io_uring::{IoUring, Probe}; use mio::unix::SourceFd; use slab::Slab; use super::{Handle, TOKEN_WAKEUP}; use crate::io::Interest; -use crate::loom::sync::atomic::Ordering; use crate::loom::sync::Mutex; use crate::runtime::driver::op::{Cancellable, Lifecycle}; @@ -223,25 +222,25 @@ impl Handle { // // If the IO_LINK flag is not set the completions may not arrive in order // for ordering set the IO_LINK flag before passing the entires to this function - pub(crate) unsafe fn register_batch( + // + // This function returns indexes in the slab for each Entry, to ensure all entries + // passed are completed, check against the slab for the indexes returned. + pub(crate) unsafe fn register_batch( &self, - entries: &mut [Entry], + entries: &mut [Entry; N], waker: Waker, - ) -> io::Result> { - if !self.check_and_init()? { - return Err(io::Error::from_raw_os_error(libc::ENOSYS)); - } + ) -> io::Result<[usize; N]> { + assert!(self.uring_probe.initialized()); let mut guard = self.get_uring().lock(); let ctx = &mut *guard; - let length = entries.len(); - let mut indexes: Vec = Vec::with_capacity(length); + let mut indexes = [0; N]; - for entry in entries.iter_mut() { + for (i, entry) in entries.iter_mut().enumerate() { let index = ctx.ops.insert(Lifecycle::Waiting(waker.clone())); entry.set_user_data(index as u64); - indexes.push(index); + indexes[i] = index; } let submit_or_remove = |ctx: &mut UringContext| -> io::Result<()> { From 5ac412a823227302affb677a98a18e2e9efeab09 Mon Sep 17 00:00:00 2001 From: Daksh <41485688+Daksh14@users.noreply.github.com> Date: Tue, 10 Mar 2026 18:24:29 -0400 Subject: [PATCH 4/4] io-uring: Remove all vec uses, experiment with batch size --- tokio/src/fs/read_uring.rs | 10 ++-- tokio/src/io/uring/read.rs | 118 ++++++++++++++++++------------------- 2 files changed, 63 insertions(+), 65 deletions(-) diff --git a/tokio/src/fs/read_uring.rs b/tokio/src/fs/read_uring.rs index c51909020d3..0dc6b3ce6b2 100644 --- a/tokio/src/fs/read_uring.rs +++ b/tokio/src/fs/read_uring.rs @@ -12,12 +12,12 @@ const PROBE_SIZE: usize = 32; const PROBE_SIZE_U32: u32 = PROBE_SIZE as u32; // batch size for batched reads -const BATCH_SIZE: usize = 128; +const BATCH_SIZE: usize = 200; // Max bytes we can read using io uring submission at a time // SAFETY: cannot be higher than u32::MAX for safe cast // Set to read max 64 MiB at time -pub(crate) const MAX_READ_SIZE: usize = 64 * 1024 * 1024; +pub(crate) const MAX_READ_SIZE: usize = 64 * 1024; pub(crate) async fn read_uring(path: &Path) -> io::Result> { let file = OpenOptions::new().read(true).open(path).await?; @@ -40,11 +40,9 @@ pub(crate) async fn read_uring(path: &Path) -> io::Result> { } async fn read_to_end_batch(fd: OwnedFd, buf: Vec) -> io::Result> { - let file_len = buf.capacity(); - // try reading in batch if we have substantial capacity - if file_len > MAX_READ_SIZE { - match Op::read_batch_size::(fd, buf, file_len).await { + if buf.capacity() > MAX_READ_SIZE { + match Op::read_batch_size::(fd, buf).await { Ok(buf) => Ok(buf), Err((r_fd, mut r_buf)) => { // clear the buffer before we write to it from start again diff --git a/tokio/src/io/uring/read.rs b/tokio/src/io/uring/read.rs index 345bb31e082..c6b14b8df79 100644 --- a/tokio/src/io/uring/read.rs +++ b/tokio/src/io/uring/read.rs @@ -67,86 +67,92 @@ impl Op { unsafe { Op::new(read_op, Read { fd, buf }) } } - // Split file read operations by batches of size N. batches will be executed in groups - // of N. - // This function will return + // Split file read operations by batches of size N. batches will be executed + // in groups of N. + // + // This function will return the final buffer of bytes in the file pub(crate) async fn read_batch_size( mut fd: OwnedFd, mut buf: Vec, - len: usize, ) -> Result, (OwnedFd, Vec)> { - let mut batches_of_ops = Vec::with_capacity(len / (N * MAX_READ_SIZE)); // hold batch_size operations and reuse it let mut n_ops = [const { MaybeUninit::uninit() }; N]; let mut last_len = 0; + let mut read_len = 0; + let len = buf.capacity(); + + for i in 0..N { + n_ops[i].write(opcode::Nop::new().build()); + } + + let mut n_ops = unsafe { assume_init(n_ops) }; // total number of batch entries to read the file completly for (index, start) in (0..len).step_by(MAX_READ_SIZE).enumerate() { - // push a chunk into the batches - if (index + 1) % N == 0 { - // SAFETY: the index + 1 divides N so we have written N ops. - // We can transmute those into a array of sqe entries - let entries: [Entry; N] = unsafe { std::mem::transmute_copy(&n_ops) }; + let end = (start + MAX_READ_SIZE).min(len); // clamp to len for the final chunk + let array_idx = index % N; + + // skip first iteration + if array_idx == 0 && index != 0 { + // SAFETY: Batches are valid array entries + let op = unsafe { Op::batch(n_ops.clone(), Read { fd, buf }) }; + let (_, r_fd, r_buf) = uring_task(op, &mut read_len).await?; - batches_of_ops.push(entries); + fd = r_fd; + buf = r_buf; } else { - let end = (start + MAX_READ_SIZE).min(len); // clamp to len for the final chunk - let len = (end - start) as u32; - - n_ops[index % N].write( - opcode::Read::new( - types::Fd(fd.as_raw_fd()), - buf.spare_capacity_mut()[start..].as_mut_ptr().cast(), - len, - ) - .offset(start as u64) - .build() - // link our sqes so cqes arrive in order - .flags(Flags::IO_LINK), - ); + let op = opcode::Read::new( + types::Fd(fd.as_raw_fd()), + buf.spare_capacity_mut()[start..].as_mut_ptr().cast(), + (end - start) as u32, + ) + .offset(start as u64) + .build() + // link our sqes so cqes arrive in order + .flags(Flags::IO_LINK); + + n_ops[array_idx] = op; } - last_len = index % N; // save last len for last batch - } - - // Handle all full batches - for batches in batches_of_ops { - // SAFETY: Batches are valid array entries - let op = unsafe { Op::batch(batches, Read { fd, buf }) }; - let (_, r_fd, r_buf) = uring_task(op).await?; - - fd = r_fd; - buf = r_buf; + last_len = array_idx; // save last array_idx for last batch } // Handle last partial batch if there is any if last_len > 0 { - // SAFETY: This should be safe as we will always have valid Entries - // in the array at any time. We will slice it by `last_len` to avoid - // double counting / wrong cqe. - let mut entries: [Entry; N] = unsafe { std::mem::transmute_copy(&n_ops) }; - - for (i, entry) in entries.iter_mut().enumerate() { + for (i, entry) in n_ops.iter_mut().enumerate() { + // no op the double counted entries if i > last_len { *entry = opcode::Nop::new().build(); } } - // SAFETY: Because of the loop above, the entries that are repeated - // or invalided have been Nop'ed - let (_, _, r_buf) = uring_task(unsafe { Op::batch(entries, Read { fd, buf }) }).await?; + // SAFETY: Because of the no-op loop above, we can assume double entry + // read is not possible + let op = unsafe { Op::batch(n_ops, Read { fd, buf }) }; + + let (_, _, r_buf) = uring_task(op, &mut read_len).await?; buf = r_buf; } + unsafe { + buf.set_len(buf.len() + read_len); + } + Ok(buf) } } +unsafe fn assume_init(n_ops: [MaybeUninit; N]) -> [Entry; N] { + unsafe { std::mem::transmute_copy(&n_ops) } +} + // Poll the batch operation and get the Output out of it -async fn uring_task(op: Op) -> Result, (OwnedFd, Vec)> { - // TODO: Maybe we can put this in a tokio task - let (res, r_fd, mut r_buf) = op.await; +async fn uring_task( + op: Op, + read_len: &mut usize, +) -> Result, (OwnedFd, Vec)> { + let (res, r_fd, r_buf) = op.await; match res { CqeResult::Batch(cqes) => { @@ -154,18 +160,12 @@ async fn uring_task(op: Op) -> Result, (Owned let cqe = i32_to_result(cqe); match cqe { - Ok(r_size) => { - // SAFETY: We have read `r_size` amount - let len = if let Some(new_len) = r_buf.len().checked_add(r_size as usize) { - new_len - } else { - return Err((r_fd, r_buf)); - }; - - unsafe { - r_buf.set_len(len); + Ok(r_size) => match read_len.checked_add(r_size as usize) { + Some(len) => { + *read_len = len; } - } + None => return Err((r_fd, r_buf)), + }, Err(e) => { if e.kind() != ErrorKind::Interrupted { return Err((r_fd, r_buf));