Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 34 additions & 11 deletions tokio/src/fs/read_uring.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::fs::OpenOptions;
use crate::runtime::driver::op::Op;
use crate::runtime::driver::op::{CqeResult, Op};

use std::io;
use std::io::ErrorKind;
Expand All @@ -11,16 +11,19 @@ use std::path::Path;
const PROBE_SIZE: usize = 32;
const PROBE_SIZE_U32: u32 = PROBE_SIZE as u32;

// batch size for batched reads
const BATCH_SIZE: usize = 200;

// Max bytes we can read using io uring submission at a time
// SAFETY: cannot be higher than u32::MAX for safe cast
// Set to read max 64 MiB at time
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says "max 64 MiB" but MAX_READ_SIZE is now 64 * 1024 (64 KiB); can we confirm whether the constant or the comment should be updated? This value also changes batching eligibility (read_to_end_batch) and the per-op chunk size in read_to_end.

Severity: low

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The comment indicates a read size of 64 MiB, but the constant MAX_READ_SIZE is set to 64 KiB. The comment should be updated to reflect the actual value to avoid confusion.

Suggested change
// Set to read max 64 MiB at time
// Set to read max 64 KiB at time

const MAX_READ_SIZE: usize = 64 * 1024 * 1024;
pub(crate) const MAX_READ_SIZE: usize = 64 * 1024;
Comment on lines 17 to +20
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Comment/value mismatch: MAX_READ_SIZE is 64 KiB, not 64 MiB.

The comment says "Set to read max 64 MiB at time" but 64 * 1024 = 65,536 bytes = 64 KiB.

Fix comment
 // Max bytes we can read using io uring submission at a time
 // SAFETY: cannot be higher than u32::MAX for safe cast
-// Set to read max 64 MiB at time
+// Set to read max 64 KiB at a time
 pub(crate) const MAX_READ_SIZE: usize = 64 * 1024;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tokio/src/fs/read_uring.rs` around lines 17 - 20, The comment on
MAX_READ_SIZE incorrectly states 64 MiB while the value 64 * 1024 equals 64 KiB;
update the comment to reflect 64 KiB (or change the constant to 64 * 1024 * 1024
if the intent was 64 MiB). Specifically, modify the doc line "Set to read max 64
MiB at time" to "Set to read max 64 KiB at a time" (or adjust the constant if
you want MiB) while keeping the SAFETY note about u32::MAX consistent with the
chosen value for MAX_READ_SIZE.


pub(crate) async fn read_uring(path: &Path) -> io::Result<Vec<u8>> {
let file = OpenOptions::new().read(true).open(path).await?;

// TODO: use io uring in the future to obtain metadata
let size_hint: Option<usize> = file.metadata().await.map(|m| m.len() as usize).ok();
let size_hint = file.metadata().await.map(|m| m.len() as usize).ok();

let fd: OwnedFd = file
.try_into_std()
Expand All @@ -33,10 +36,26 @@ pub(crate) async fn read_uring(path: &Path) -> io::Result<Vec<u8>> {
buf.try_reserve(size_hint)?;
}

read_to_end_uring(fd, buf).await
read_to_end_batch(fd, buf).await
}

async fn read_to_end_batch(fd: OwnedFd, buf: Vec<u8>) -> io::Result<Vec<u8>> {
// try reading in batch if we have substantial capacity
if buf.capacity() > MAX_READ_SIZE {
match Op::read_batch_size::<BATCH_SIZE>(fd, buf).await {
Ok(buf) => Ok(buf),
Err((r_fd, mut r_buf)) => {
// clear the buffer before we write to it from start again
r_buf.clear();
read_to_end(r_fd, r_buf).await
}
}
} else {
read_to_end(fd, buf).await
}
}

async fn read_to_end_uring(mut fd: OwnedFd, mut buf: Vec<u8>) -> io::Result<Vec<u8>> {
async fn read_to_end(mut fd: OwnedFd, mut buf: Vec<u8>) -> io::Result<Vec<u8>> {
let mut offset = 0;
let start_cap = buf.capacity();

Expand Down Expand Up @@ -119,16 +138,20 @@ async fn op_read(
let (res, r_fd, r_buf) = Op::read(fd, buf, read_len, *offset).await;

match res {
Err(e) if e.kind() == ErrorKind::Interrupted => {
buf = r_buf;
fd = r_fd;
}
Err(e) => return Err(e),
Ok(size_read) => {
CqeResult::Single(Ok(size_read)) => {
*offset += size_read as u64;

return Ok((r_fd, r_buf, size_read == 0));
}
CqeResult::InitErr(e) | CqeResult::Single(Err(e)) => {
if e.kind() == ErrorKind::Interrupted {
buf = r_buf;
fd = r_fd;
} else {
return Err(e);
}
}
CqeResult::Batch(_) => return Err(ErrorKind::Unsupported.into()),
}
}
}
14 changes: 7 additions & 7 deletions tokio/src/io/uring/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult

use io_uring::{opcode, types};
use std::ffi::CString;
use std::io::{self, Error};
use std::io::{self, ErrorKind};
use std::os::fd::FromRawFd;
use std::path::Path;

Expand All @@ -19,13 +19,13 @@ pub(crate) struct Open {

impl Completable for Open {
type Output = io::Result<crate::fs::File>;
fn complete(self, cqe: CqeResult) -> Self::Output {
cqe.result
.map(|fd| unsafe { crate::fs::File::from_raw_fd(fd as i32) })
}

fn complete_with_error(self, err: Error) -> Self::Output {
Err(err)
fn complete(self, cqe: CqeResult) -> Self::Output {
match cqe {
CqeResult::Single(Ok(fd)) => Ok(unsafe { crate::fs::File::from_raw_fd(fd as i32) }),
CqeResult::Batch(_) => Err(ErrorKind::Unsupported.into()),
CqeResult::InitErr(err) | CqeResult::Single(Err(err)) => Err(err),
}
}
}

Expand Down
140 changes: 130 additions & 10 deletions tokio/src/io/uring/read.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,40 @@
use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op};
use crate::fs::read_uring::MAX_READ_SIZE;
use crate::runtime::driver::op::{
i32_to_result, CancelData, Cancellable, Completable, CqeResult, Op,
};

use io_uring::squeue::{Entry, Flags};
use io_uring::{opcode, types};
use std::io::{self, Error};

use std::io::ErrorKind;
use std::mem::MaybeUninit;
use std::os::fd::{AsRawFd, OwnedFd};

type Output<const N: usize> = (CqeResult<N>, OwnedFd, Vec<u8>);

#[derive(Debug)]
pub(crate) struct Read {
fd: OwnedFd,
buf: Vec<u8>,
}

impl Completable for Read {
type Output = (io::Result<u32>, OwnedFd, Vec<u8>);
impl<const N: usize> Completable<N> for Read {
type Output = Output<N>;

fn complete(self, cqe: CqeResult) -> Self::Output {
fn complete(self, cqe: CqeResult<N>) -> Self::Output {
let mut buf = self.buf;

if let Ok(len) = cqe.result {
if let CqeResult::Single(Ok(len)) = cqe {
// increase length of buffer on successful
// completion
let new_len = buf.len() + len as usize;
// SAFETY: Kernel read len bytes
unsafe { buf.set_len(new_len) };
}

(cqe.result, self.fd, buf)
}
// Handle rest each batch outside

fn complete_with_error(self, err: Error) -> Self::Output {
(Err(err), self.fd, self.buf)
(cqe, self.fd, buf)
}
}

Expand Down Expand Up @@ -58,4 +66,116 @@ impl Op<Read> {
// SAFETY: Parameters are valid for the entire duration of the operation
unsafe { Op::new(read_op, Read { fd, buf }) }
}

// Split file read operations by batches of size N. batches will be executed
// in groups of N.
//
// This function will return the final buffer of bytes in the file
pub(crate) async fn read_batch_size<const N: usize>(
mut fd: OwnedFd,
mut buf: Vec<u8>,
) -> Result<Vec<u8>, (OwnedFd, Vec<u8>)> {
// hold batch_size operations and reuse it
let mut n_ops = [const { MaybeUninit::uninit() }; N];
let mut last_len = 0;
let mut read_len = 0;
let len = buf.capacity();

for i in 0..N {
n_ops[i].write(opcode::Nop::new().build());
}

let mut n_ops = unsafe { assume_init(n_ops) };
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The custom assume_init function uses unsafe transmute_copy. A safer, idiomatic way to initialize the array is by using the safe MaybeUninit::array_assume_init function, which is stable since Rust 1.63. This avoids the need for a custom unsafe helper function.

You can remove the assume_init function entirely with this change.

        let mut n_ops = MaybeUninit::array_assume_init(n_ops);


// total number of batch entries to read the file completly
for (index, start) in (0..len).step_by(MAX_READ_SIZE).enumerate() {
let end = (start + MAX_READ_SIZE).min(len); // clamp to len for the final chunk
let array_idx = index % N;

// skip first iteration
if array_idx == 0 && index != 0 {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When array_idx == 0 && index != 0, this branch submits the previous batch but doesn't enqueue a read for the current start, so every Nth chunk is skipped and n_ops[0] may remain a stale entry for later submissions. That can truncate or corrupt the returned buffer for files that span more than N chunks.

Severity: high

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

// SAFETY: Batches are valid array entries
let op = unsafe { Op::batch(n_ops.clone(), Read { fd, buf }) };
let (_, r_fd, r_buf) = uring_task(op, &mut read_len).await?;

fd = r_fd;
buf = r_buf;
} else {
let op = opcode::Read::new(
types::Fd(fd.as_raw_fd()),
buf.spare_capacity_mut()[start..].as_mut_ptr().cast(),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

The index start is calculated based on the buffer's capacity, but it is used to index into the buffer's spare capacity. If the buffer is not empty, buf.spare_capacity_mut()[start..] will eventually attempt to access an index out of bounds of the spare capacity, leading to a panic. This is because spare_capacity_mut().len() is capacity - len, and start can go up to capacity.

(end - start) as u32,
)
.offset(start as u64)
.build()
// link our sqes so cqes arrive in order
.flags(Flags::IO_LINK);

n_ops[array_idx] = op;
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Batch loop skips every Nth chunk causing data loss

High Severity

When array_idx == 0 && index != 0, the if branch submits the previous batch but the else branch that creates the read op for the current chunk is skipped entirely. This means every chunk at a batch boundary (indices N, 2N, 3N…) is silently dropped. Additionally, n_ops[0] retains the stale entry from the previous batch, causing chunk 0 to be re-read in subsequent batches. When total chunks equals kN+1, last_len ends as 0, so the final if last_len > 0 guard prevents submission of the last chunk too.

Additional Locations (1)
Fix in Cursor Fix in Web


last_len = array_idx; // save last array_idx for last batch
}
Comment on lines +91 to +118
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Off-by-one issue: First entry of subsequent batches is skipped.

When array_idx == 0 && index != 0, the code submits the current n_ops batch before setting n_ops[0] for the current iteration. This means:

  1. The first batch correctly gets entries at indices 0 to N-1
  2. When starting the second batch (index == N), array_idx is 0, so it submits the previous batch (correct), but then the current read operation for index == N is never added to n_ops—the loop continues to the next iteration

The logic should either:

  • Submit the batch, then fall through to set n_ops[array_idx], or
  • Restructure to submit after the array is filled
Suggested fix
         for (index, start) in (0..len).step_by(MAX_READ_SIZE).enumerate() {
             let end = (start + MAX_READ_SIZE).min(len); // clamp to len for the final chunk
             let array_idx = index % N;

             // skip first iteration
             if array_idx == 0 && index != 0 {
                 // SAFETY: Batches are valid array entries
                 let op = unsafe { Op::batch(n_ops.clone(), Read { fd, buf }) };
                 let (_, r_fd, r_buf) = uring_task(op, &mut read_len).await?;

                 fd = r_fd;
                 buf = r_buf;
-            } else {
-                let op = opcode::Read::new(
-                    types::Fd(fd.as_raw_fd()),
-                    buf.spare_capacity_mut()[start..].as_mut_ptr().cast(),
-                    (end - start) as u32,
-                )
-                .offset(start as u64)
-                .build()
-                // link our sqes so cqes arrive in order
-                .flags(Flags::IO_LINK);
-
-                n_ops[array_idx] = op;
             }
+
+            let op = opcode::Read::new(
+                types::Fd(fd.as_raw_fd()),
+                buf.spare_capacity_mut()[start..].as_mut_ptr().cast(),
+                (end - start) as u32,
+            )
+            .offset(start as u64)
+            .build()
+            // link our sqes so cqes arrive in order
+            .flags(Flags::IO_LINK);
+
+            n_ops[array_idx] = op;

             last_len = array_idx; // save last array_idx for last batch
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for (index, start) in (0..len).step_by(MAX_READ_SIZE).enumerate() {
let end = (start + MAX_READ_SIZE).min(len); // clamp to len for the final chunk
let array_idx = index % N;
// skip first iteration
if array_idx == 0 && index != 0 {
// SAFETY: Batches are valid array entries
let op = unsafe { Op::batch(n_ops.clone(), Read { fd, buf }) };
let (_, r_fd, r_buf) = uring_task(op, &mut read_len).await?;
fd = r_fd;
buf = r_buf;
} else {
let op = opcode::Read::new(
types::Fd(fd.as_raw_fd()),
buf.spare_capacity_mut()[start..].as_mut_ptr().cast(),
(end - start) as u32,
)
.offset(start as u64)
.build()
// link our sqes so cqes arrive in order
.flags(Flags::IO_LINK);
n_ops[array_idx] = op;
}
last_len = array_idx; // save last array_idx for last batch
}
for (index, start) in (0..len).step_by(MAX_READ_SIZE).enumerate() {
let end = (start + MAX_READ_SIZE).min(len); // clamp to len for the final chunk
let array_idx = index % N;
// skip first iteration
if array_idx == 0 && index != 0 {
// SAFETY: Batches are valid array entries
let op = unsafe { Op::batch(n_ops.clone(), Read { fd, buf }) };
let (_, r_fd, r_buf) = uring_task(op, &mut read_len).await?;
fd = r_fd;
buf = r_buf;
}
let op = opcode::Read::new(
types::Fd(fd.as_raw_fd()),
buf.spare_capacity_mut()[start..].as_mut_ptr().cast(),
(end - start) as u32,
)
.offset(start as u64)
.build()
// link our sqes so cqes arrive in order
.flags(Flags::IO_LINK);
n_ops[array_idx] = op;
last_len = array_idx; // save last array_idx for last batch
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tokio/src/io/uring/read.rs` around lines 91 - 118, The loop currently skips
assigning the current iteration's op when array_idx == 0 && index != 0 because
the code submits the previous batch in the if branch and uses else for
assignment; change the control flow so the batch submission (unsafe {
Op::batch(n_ops.clone(), Read { fd, buf }) } + uring_task(...)) happens first
when array_idx == 0 && index != 0 but does NOT use else — after submitting,
continue to construct the Read opcode
(opcode::Read::new(...).offset(...).build().flags(Flags::IO_LINK)) and assign it
into n_ops[array_idx]; keep updating last_len = array_idx as before so the final
batch handling still works. This ensures the first entry of each subsequent
batch is populated instead of skipped.

Comment on lines +91 to +118
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-critical critical

The batching logic in read_batch_size is flawed, leading to a critical information leak vulnerability. When a full batch is submitted (array_idx == 0 && index != 0), the else branch is skipped, meaning the opcode::Read for that iteration is not created and not added to n_ops. This effectively skips a chunk of the file and creates gaps in the buffer that are never written to by the kernel. Since read_len is updated with the total bytes read by the submitted operations, and buf.set_len is called at the end, the buffer's length will include these uninitialized gaps, exposing potentially sensitive data.

            if array_idx == 0 && index != 0 {
                // SAFETY: Batches are valid array entries
                let op = unsafe { Op::batch(n_ops.clone(), Read { fd, buf }) };
                let (_, r_fd, r_buf) = uring_task(op, &mut read_len).await?;

                fd = r_fd;
                buf = r_buf;
            }
            let op = opcode::Read::new(
                types::Fd(fd.as_raw_fd()),
                buf.spare_capacity_mut()[start..].as_mut_ptr().cast(),
                (end - start) as u32,
            )
            .offset(start as u64)
            .build()
            // link our sqes so cqes arrive in order
            .flags(Flags::IO_LINK);

            n_ops[array_idx] = op;


// Handle last partial batch if there is any
if last_len > 0 {
for (i, entry) in n_ops.iter_mut().enumerate() {
// no op the double counted entries
if i > last_len {
*entry = opcode::Nop::new().build();
}
}

// SAFETY: Because of the no-op loop above, we can assume double entry
// read is not possible
let op = unsafe { Op::batch(n_ops, Read { fd, buf }) };

let (_, _, r_buf) = uring_task(op, &mut read_len).await?;

buf = r_buf;
}

unsafe {
buf.set_len(buf.len() + read_len);
}

Ok(buf)
}
}

unsafe fn assume_init<const N: usize>(n_ops: [MaybeUninit<Entry>; N]) -> [Entry; N] {
unsafe { std::mem::transmute_copy(&n_ops) }
}

// Poll the batch operation and get the Output out of it
async fn uring_task<const N: usize>(
op: Op<Read, N>,
read_len: &mut usize,
) -> Result<Output<N>, (OwnedFd, Vec<u8>)> {
let (res, r_fd, r_buf) = op.await;

match res {
CqeResult::Batch(cqes) => {
for cqe in cqes {
let cqe = i32_to_result(cqe);

match cqe {
Ok(r_size) => match read_len.checked_add(r_size as usize) {
Some(len) => {
*read_len = len;
}
None => return Err((r_fd, r_buf)),
},
Err(e) => {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In uring_task, ErrorKind::Interrupted results are currently ignored, which can leave unread holes in the buffer while still returning Ok(buf). Consider treating Interrupted as a retry/fallback condition (similar to op_read) to avoid returning partial/corrupt data.

Severity: medium

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

if e.kind() != ErrorKind::Interrupted {
return Err((r_fd, r_buf));
}
}
}
}
}
_ => return Err((r_fd, r_buf)),
};

Ok((res, r_fd, r_buf))
}
13 changes: 8 additions & 5 deletions tokio/src/io/uring/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult
use crate::util::as_ref::OwnedBuf;

use io_uring::{opcode, types};
use std::io::{self, Error};
use std::io::{self, ErrorKind};
use std::os::fd::{AsRawFd, OwnedFd};

#[derive(Debug)]
Expand All @@ -13,12 +13,15 @@ pub(crate) struct Write {

impl Completable for Write {
type Output = (io::Result<u32>, OwnedBuf, OwnedFd);

fn complete(self, cqe: CqeResult) -> Self::Output {
(cqe.result, self.buf, self.fd)
}
let res = match cqe {
CqeResult::Single(cqe) => cqe,
CqeResult::Batch(_) => Err(ErrorKind::Unsupported.into()),
CqeResult::InitErr(err) => Err(err),
};

fn complete_with_error(self, err: Error) -> Self::Output {
(Err(err), self.buf, self.fd)
(res, self.buf, self.fd)
}
}

Expand Down
Loading
Loading