Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions src/fs/file.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use io_uring::squeue::Flags;

use crate::buf::fixed::FixedBuf;
use crate::buf::{BoundedBuf, BoundedBufMut, IoBuf, IoBufMut, Slice};
use crate::fs::OpenOptions;
use crate::io::SharedFd;

use crate::runtime::driver::op::Op;
use crate::{UnsubmittedOneshot, UnsubmittedWrite};
use crate::{UnsubmittedOneshot, UnsubmittedWrite, UnsubmittedRead};
use std::fmt;
use std::io;
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
Expand Down Expand Up @@ -182,6 +184,27 @@ impl File {
op.await
}

pub async fn read_at_with_flags<T: BoundedBufMut>(
&self,
buf: T,
pos: u64,
flags: Flags,
) -> crate::BufResult<usize, T> {
// Submit the read operation
let op = Op::read_at_with_flags(&self.fd, buf, pos, flags).unwrap();
op.await
}

pub fn unsubmitted_read_at_with_flags<T: BoundedBufMut>(
&self,
buf: T,
pos: u64,
flags: Flags,
) -> UnsubmittedRead<T> {
UnsubmittedOneshot::read_at_with_flags(&self.fd, buf, pos, flags)
}


/// Read some bytes at the specified offset from the file into the specified
/// array of buffers, returning how many bytes were read.
///
Expand Down Expand Up @@ -540,7 +563,16 @@ impl File {
///
/// [`Ok(n)`]: Ok
pub fn write_at<T: BoundedBuf>(&self, buf: T, pos: u64) -> UnsubmittedWrite<T> {
UnsubmittedOneshot::write_at(&self.fd, buf, pos)
self.write_at_with_flags(buf, pos, Flags::empty())
}

pub fn write_at_with_flags<T: BoundedBuf>(
&self,
buf: T,
pos: u64,
flags: Flags,
) -> UnsubmittedWrite<T> {
UnsubmittedOneshot::write_at_with_flags(&self.fd, buf, pos, flags)
}

/// Attempts to write an entire buffer into this file at the specified offset.
Expand Down
2 changes: 1 addition & 1 deletion src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub(crate) use noop::NoOp;

mod open;

mod read;
pub(crate) mod read;

mod read_fixed;

Expand Down
73 changes: 73 additions & 0 deletions src/io/read.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,73 @@
use io_uring::squeue::Flags;

use crate::buf::BoundedBufMut;
use crate::{OneshotOutputTransform, UnsubmittedOneshot};
use io_uring::cqueue::Entry;
use crate::io::SharedFd;
use crate::BufResult;

use crate::runtime::driver::op::{Completable, CqeResult, Op};
use crate::runtime::CONTEXT;
use std::io;
use std::marker::PhantomData;

/// An unsubmitted read operation.
pub type UnsubmittedRead<T> = UnsubmittedOneshot<ReadData<T>, ReadTransform<T>>;

#[allow(missing_docs)]
pub struct ReadData<T> {
/// Holds a strong ref to the FD, preventing the file from being closed
/// while the operation is in-flight.
_fd: SharedFd,

buf: T,
}


#[allow(missing_docs)]
pub struct ReadTransform<T> {
_phantom: PhantomData<T>,
}

impl<T> OneshotOutputTransform for ReadTransform<T> {
type Output = BufResult<usize, T>;
type StoredData = ReadData<T>;

fn transform_oneshot_output(self, data: Self::StoredData, cqe: Entry) -> Self::Output {
let res = if cqe.result() >= 0 {
Ok(cqe.result() as usize)
} else {
Err(io::Error::from_raw_os_error(-cqe.result()))
};

(res, data.buf)
}
}

impl<T: BoundedBufMut> UnsubmittedRead<T> {
pub(crate) fn read_at_with_flags(fd: &SharedFd, mut buf: T, offset: u64, flags: Flags) -> Self {
use io_uring::{opcode, types};

// Get raw buffer info
let ptr = buf.stable_mut_ptr();
let len = buf.bytes_init();

Self::new(
ReadData {
_fd: fd.clone(),
buf,
},
ReadTransform {
_phantom: PhantomData,
},
opcode::Read::new(types::Fd(fd.raw_fd()), ptr, len as _)
.offset(offset as _)
.build()
.flags(flags),
)
}

}

pub(crate) struct Read<T> {
/// Holds a strong ref to the FD, preventing the file from being closed
Expand All @@ -18,6 +81,15 @@ pub(crate) struct Read<T> {

impl<T: BoundedBufMut> Op<Read<T>> {
pub(crate) fn read_at(fd: &SharedFd, buf: T, offset: u64) -> io::Result<Op<Read<T>>> {
Self::read_at_with_flags(fd, buf, offset, Flags::empty())
}

pub(crate) fn read_at_with_flags(
fd: &SharedFd,
buf: T,
offset: u64,
flags: Flags,
) -> io::Result<Op<Read<T>>> {
use io_uring::{opcode, types};

CONTEXT.with(|x| {
Expand All @@ -33,6 +105,7 @@ impl<T: BoundedBufMut> Op<Read<T>> {
opcode::Read::new(types::Fd(fd.raw_fd()), ptr, len as _)
.offset(offset as _)
.build()
.flags(flags)
},
)
})
Expand Down
23 changes: 23 additions & 0 deletions src/io/write.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{buf::BoundedBuf, io::SharedFd, BufResult, OneshotOutputTransform, UnsubmittedOneshot};
use io_uring::cqueue::Entry;
use io_uring::squeue::Flags;
use std::io;
use std::marker::PhantomData;

Expand Down Expand Up @@ -56,4 +57,26 @@ impl<T: BoundedBuf> UnsubmittedWrite<T> {
.build(),
)
}

pub(crate) fn write_at_with_flags(fd: &SharedFd, buf: T, offset: u64, flags: Flags) -> Self {
use io_uring::{opcode, types};

// Get raw buffer info
let ptr = buf.stable_ptr();
let len = buf.bytes_init();

Self::new(
WriteData {
_fd: fd.clone(),
buf,
},
WriteTransform {
_phantom: PhantomData,
},
opcode::Write::new(types::Fd(fd.raw_fd()), ptr, len as _)
.offset(offset as _)
.build()
.flags(flags),
)
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ pub mod fs;
pub mod net;

pub use io::write::*;
pub use io::read::*;
pub use runtime::driver::op::{InFlightOneshot, OneshotOutputTransform, UnsubmittedOneshot};
pub use runtime::spawn;
pub use runtime::Runtime;
Expand Down
124 changes: 124 additions & 0 deletions tests/link.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
use std::{task::Context, io::Write, os::unix::prelude::OpenOptionsExt};

use futures_util::{future::{Either, BoxFuture, join_all}, Future, FutureExt};
use io_uring::squeue::Flags;
use tempfile::NamedTempFile;
use tokio_uring::{fs::File, buf::IoBufMut, BufResult};
use std::task::Poll;

struct UnsafeBuffer {
addr : *mut u8
}

unsafe impl tokio_uring::buf::IoBuf for UnsafeBuffer {
fn stable_ptr(&self) -> *const u8 {
self.addr
}

fn bytes_init(&self) -> usize {
std::mem::size_of::<u8>()
}

fn bytes_total(&self) -> usize {
std::mem::size_of::<u8>()
}
}

unsafe impl IoBufMut for UnsafeBuffer {
fn stable_mut_ptr(&mut self) -> *mut u8 {
self.addr
}
unsafe fn set_init(&mut self, _pos: usize) {}
}

fn tempfile() -> NamedTempFile {
NamedTempFile::new().unwrap()
}

#[test]
fn multiple_write() {
tokio_uring::start(async {
let tempfile = tempfile();
let file = std::fs::OpenOptions::new().read(true).write(true)
.custom_flags(libc::O_NONBLOCK)
.open(tempfile.path()).unwrap();
let file = tokio_uring::fs::File::from_std(file);
let mut tasks = Vec::new();

for i in 0..20 {
let buf = i.to_string();
// let write_task = file.write_at(buf.into_bytes(), 0).submit();
let write_task = file.write_at_with_flags(buf.into_bytes(), 0, Flags::IO_LINK).submit();
let task = tokio_uring::spawn(write_task);
tasks.push(task);
}



let file = File::open(tempfile.path()).await.unwrap();
let _ = futures_util::future::join_all(tasks).await;

let buf = vec![0u8, 10];
let (result, buf) = file.read_at(buf, 0).await;
result.unwrap();
let str = String::from_utf8(buf).unwrap();
assert_eq!(&str, "19");
});
}


#[test]
fn multiple_read() {
tokio_uring::start(async {
let tempfile = tempfile();
let file = std::fs::OpenOptions::new().read(true).write(true)
.custom_flags(libc::O_NONBLOCK)
.open(tempfile.path()).unwrap();
let file = tokio_uring::fs::File::from_std(file);
let mut tasks = Vec::new();

for i in 0..20 {
let buf = i.to_string();
let read_task = file.unsubmitted_read_at_with_flags(buf.into_bytes(), 0, Flags::IO_LINK).submit();
let task = tokio_uring::spawn(read_task);
tasks.push(task);
}

let file = File::open(tempfile.path()).await.unwrap();
let _ = futures_util::future::join_all(tasks).await;

let buf = vec![0u8, 10];
let (result, buf) = file.read_at(buf, 0).await;
result.unwrap();
let str = String::from_utf8(buf).unwrap();
assert_eq!(&str, "19");
});
}


#[test]
fn mix_read_write() {
tokio_uring::start(async {
let tempfile = tempfile();
let file = std::fs::OpenOptions::new().read(true).write(true)
.custom_flags(libc::O_NONBLOCK)
.open(tempfile.path()).unwrap();
let file = tokio_uring::fs::File::from_std(file);

let mut buffer = 0u8;

let mut tasks = Vec::new();
for i in 0..100 {
let buf = UnsafeBuffer { addr: std::ptr::addr_of_mut!(buffer), };
let write_task = file.write_at_with_flags(buf, 0, Flags::IO_LINK).submit();
tasks.push(tokio_uring::spawn(write_task));

let buf = UnsafeBuffer { addr: std::ptr::addr_of_mut!(buffer), };
let read_task = file.unsubmitted_read_at_with_flags(buf, 0, Flags::IO_LINK).submit();
tasks.push(tokio_uring::spawn(read_task));
}



});
}