From ae2fe5a06ea3412a1022e595fa1444139c155b9f Mon Sep 17 00:00:00 2001 From: Sidong Yang Date: Tue, 26 Dec 2023 17:42:48 +0900 Subject: [PATCH] add flags --- src/fs/file.rs | 36 +++++++++++++- src/io/mod.rs | 2 +- src/io/read.rs | 73 ++++++++++++++++++++++++++++ src/io/write.rs | 23 +++++++++ src/lib.rs | 1 + tests/link.rs | 124 ++++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 256 insertions(+), 3 deletions(-) create mode 100644 tests/link.rs diff --git a/src/fs/file.rs b/src/fs/file.rs index 9cd47f21..79381b8b 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -1,10 +1,12 @@ +use io_uring::squeue::Flags; + use crate::buf::fixed::FixedBuf; use crate::buf::{BoundedBuf, BoundedBufMut, IoBuf, IoBufMut, Slice}; use crate::fs::OpenOptions; use crate::io::SharedFd; use crate::runtime::driver::op::Op; -use crate::{UnsubmittedOneshot, UnsubmittedWrite}; +use crate::{UnsubmittedOneshot, UnsubmittedWrite, UnsubmittedRead}; use std::fmt; use std::io; use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; @@ -182,6 +184,27 @@ impl File { op.await } + pub async fn read_at_with_flags( + &self, + buf: T, + pos: u64, + flags: Flags, + ) -> crate::BufResult { + // Submit the read operation + let op = Op::read_at_with_flags(&self.fd, buf, pos, flags).unwrap(); + op.await + } + + pub fn unsubmitted_read_at_with_flags( + &self, + buf: T, + pos: u64, + flags: Flags, + ) -> UnsubmittedRead { + UnsubmittedOneshot::read_at_with_flags(&self.fd, buf, pos, flags) + } + + /// Read some bytes at the specified offset from the file into the specified /// array of buffers, returning how many bytes were read. /// @@ -540,7 +563,16 @@ impl File { /// /// [`Ok(n)`]: Ok pub fn write_at(&self, buf: T, pos: u64) -> UnsubmittedWrite { - UnsubmittedOneshot::write_at(&self.fd, buf, pos) + self.write_at_with_flags(buf, pos, Flags::empty()) + } + + pub fn write_at_with_flags( + &self, + buf: T, + pos: u64, + flags: Flags, + ) -> UnsubmittedWrite { + UnsubmittedOneshot::write_at_with_flags(&self.fd, buf, pos, flags) } /// Attempts to write an entire buffer into this file at the specified offset. diff --git a/src/io/mod.rs b/src/io/mod.rs index 6985bdd3..4b2272a7 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -15,7 +15,7 @@ pub(crate) use noop::NoOp; mod open; -mod read; +pub(crate) mod read; mod read_fixed; diff --git a/src/io/read.rs b/src/io/read.rs index c3395b40..52418ed7 100644 --- a/src/io/read.rs +++ b/src/io/read.rs @@ -1,10 +1,73 @@ +use io_uring::squeue::Flags; + use crate::buf::BoundedBufMut; +use crate::{OneshotOutputTransform, UnsubmittedOneshot}; +use io_uring::cqueue::Entry; use crate::io::SharedFd; use crate::BufResult; use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; use std::io; +use std::marker::PhantomData; + +/// An unsubmitted read operation. +pub type UnsubmittedRead = UnsubmittedOneshot, ReadTransform>; + +#[allow(missing_docs)] +pub struct ReadData { + /// Holds a strong ref to the FD, preventing the file from being closed + /// while the operation is in-flight. + _fd: SharedFd, + + buf: T, +} + + +#[allow(missing_docs)] +pub struct ReadTransform { + _phantom: PhantomData, +} + +impl OneshotOutputTransform for ReadTransform { + type Output = BufResult; + type StoredData = ReadData; + + fn transform_oneshot_output(self, data: Self::StoredData, cqe: Entry) -> Self::Output { + let res = if cqe.result() >= 0 { + Ok(cqe.result() as usize) + } else { + Err(io::Error::from_raw_os_error(-cqe.result())) + }; + + (res, data.buf) + } +} + +impl UnsubmittedRead { + pub(crate) fn read_at_with_flags(fd: &SharedFd, mut buf: T, offset: u64, flags: Flags) -> Self { + use io_uring::{opcode, types}; + + // Get raw buffer info + let ptr = buf.stable_mut_ptr(); + let len = buf.bytes_init(); + + Self::new( + ReadData { + _fd: fd.clone(), + buf, + }, + ReadTransform { + _phantom: PhantomData, + }, + opcode::Read::new(types::Fd(fd.raw_fd()), ptr, len as _) + .offset(offset as _) + .build() + .flags(flags), + ) + } + +} pub(crate) struct Read { /// Holds a strong ref to the FD, preventing the file from being closed @@ -18,6 +81,15 @@ pub(crate) struct Read { impl Op> { pub(crate) fn read_at(fd: &SharedFd, buf: T, offset: u64) -> io::Result>> { + Self::read_at_with_flags(fd, buf, offset, Flags::empty()) + } + + pub(crate) fn read_at_with_flags( + fd: &SharedFd, + buf: T, + offset: u64, + flags: Flags, + ) -> io::Result>> { use io_uring::{opcode, types}; CONTEXT.with(|x| { @@ -33,6 +105,7 @@ impl Op> { opcode::Read::new(types::Fd(fd.raw_fd()), ptr, len as _) .offset(offset as _) .build() + .flags(flags) }, ) }) diff --git a/src/io/write.rs b/src/io/write.rs index 6c607f75..4273a15a 100644 --- a/src/io/write.rs +++ b/src/io/write.rs @@ -1,5 +1,6 @@ use crate::{buf::BoundedBuf, io::SharedFd, BufResult, OneshotOutputTransform, UnsubmittedOneshot}; use io_uring::cqueue::Entry; +use io_uring::squeue::Flags; use std::io; use std::marker::PhantomData; @@ -56,4 +57,26 @@ impl UnsubmittedWrite { .build(), ) } + + pub(crate) fn write_at_with_flags(fd: &SharedFd, buf: T, offset: u64, flags: Flags) -> Self { + use io_uring::{opcode, types}; + + // Get raw buffer info + let ptr = buf.stable_ptr(); + let len = buf.bytes_init(); + + Self::new( + WriteData { + _fd: fd.clone(), + buf, + }, + WriteTransform { + _phantom: PhantomData, + }, + opcode::Write::new(types::Fd(fd.raw_fd()), ptr, len as _) + .offset(offset as _) + .build() + .flags(flags), + ) + } } diff --git a/src/lib.rs b/src/lib.rs index d1cc6e02..ba70995e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -79,6 +79,7 @@ pub mod fs; pub mod net; pub use io::write::*; +pub use io::read::*; pub use runtime::driver::op::{InFlightOneshot, OneshotOutputTransform, UnsubmittedOneshot}; pub use runtime::spawn; pub use runtime::Runtime; diff --git a/tests/link.rs b/tests/link.rs new file mode 100644 index 00000000..c02de447 --- /dev/null +++ b/tests/link.rs @@ -0,0 +1,124 @@ +use std::{task::Context, io::Write, os::unix::prelude::OpenOptionsExt}; + +use futures_util::{future::{Either, BoxFuture, join_all}, Future, FutureExt}; +use io_uring::squeue::Flags; +use tempfile::NamedTempFile; +use tokio_uring::{fs::File, buf::IoBufMut, BufResult}; +use std::task::Poll; + +struct UnsafeBuffer { + addr : *mut u8 +} + +unsafe impl tokio_uring::buf::IoBuf for UnsafeBuffer { + fn stable_ptr(&self) -> *const u8 { + self.addr + } + + fn bytes_init(&self) -> usize { + std::mem::size_of::() + } + + fn bytes_total(&self) -> usize { + std::mem::size_of::() + } +} + +unsafe impl IoBufMut for UnsafeBuffer { + fn stable_mut_ptr(&mut self) -> *mut u8 { + self.addr + } + unsafe fn set_init(&mut self, _pos: usize) {} +} + +fn tempfile() -> NamedTempFile { + NamedTempFile::new().unwrap() +} + +#[test] +fn multiple_write() { + tokio_uring::start(async { + let tempfile = tempfile(); + let file = std::fs::OpenOptions::new().read(true).write(true) + .custom_flags(libc::O_NONBLOCK) + .open(tempfile.path()).unwrap(); + let file = tokio_uring::fs::File::from_std(file); + let mut tasks = Vec::new(); + + for i in 0..20 { + let buf = i.to_string(); + // let write_task = file.write_at(buf.into_bytes(), 0).submit(); + let write_task = file.write_at_with_flags(buf.into_bytes(), 0, Flags::IO_LINK).submit(); + let task = tokio_uring::spawn(write_task); + tasks.push(task); + } + + + + let file = File::open(tempfile.path()).await.unwrap(); + let _ = futures_util::future::join_all(tasks).await; + + let buf = vec![0u8, 10]; + let (result, buf) = file.read_at(buf, 0).await; + result.unwrap(); + let str = String::from_utf8(buf).unwrap(); + assert_eq!(&str, "19"); + }); +} + + +#[test] +fn multiple_read() { + tokio_uring::start(async { + let tempfile = tempfile(); + let file = std::fs::OpenOptions::new().read(true).write(true) + .custom_flags(libc::O_NONBLOCK) + .open(tempfile.path()).unwrap(); + let file = tokio_uring::fs::File::from_std(file); + let mut tasks = Vec::new(); + + for i in 0..20 { + let buf = i.to_string(); + let read_task = file.unsubmitted_read_at_with_flags(buf.into_bytes(), 0, Flags::IO_LINK).submit(); + let task = tokio_uring::spawn(read_task); + tasks.push(task); + } + + let file = File::open(tempfile.path()).await.unwrap(); + let _ = futures_util::future::join_all(tasks).await; + + let buf = vec![0u8, 10]; + let (result, buf) = file.read_at(buf, 0).await; + result.unwrap(); + let str = String::from_utf8(buf).unwrap(); + assert_eq!(&str, "19"); + }); +} + + +#[test] +fn mix_read_write() { + tokio_uring::start(async { + let tempfile = tempfile(); + let file = std::fs::OpenOptions::new().read(true).write(true) + .custom_flags(libc::O_NONBLOCK) + .open(tempfile.path()).unwrap(); + let file = tokio_uring::fs::File::from_std(file); + + let mut buffer = 0u8; + + let mut tasks = Vec::new(); + for i in 0..100 { + let buf = UnsafeBuffer { addr: std::ptr::addr_of_mut!(buffer), }; + let write_task = file.write_at_with_flags(buf, 0, Flags::IO_LINK).submit(); + tasks.push(tokio_uring::spawn(write_task)); + + let buf = UnsafeBuffer { addr: std::ptr::addr_of_mut!(buffer), }; + let read_task = file.unsubmitted_read_at_with_flags(buf, 0, Flags::IO_LINK).submit(); + tasks.push(tokio_uring::spawn(read_task)); + } + + + + }); +}