Skip to content

7761: io-uring: Support batching#76

Open
martin-augment wants to merge 4 commits intomasterfrom
pr-7761-2026-03-11-10-17-33
Open

7761: io-uring: Support batching#76
martin-augment wants to merge 4 commits intomasterfrom
pr-7761-2026-03-11-10-17-33

Conversation

@martin-augment
Copy link
Copy Markdown
Owner

7761: To review by AI

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 11, 2026

Walkthrough

This change introduces batch I/O operation support to the tokio I/O uring driver. The modifications add const generic batch-size parameters across the operation model, introduce a new CqeResult<N> enum to handle single, batched, and initialization error variants, and implement batch submission and cancellation logic. The file reading path is updated to attempt batch reads when buffer capacity exceeds a limit, with fallback to sequential reads on failure. Error handling is refactored to pattern-match on result variants, and new batch registration and cancellation helpers are added to the uring context.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch pr-7761-2026-03-11-10-17-33

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (4)
tokio/src/runtime/driver/op.rs (2)

212-217: Silent skip when lifecycle entry is missing may hide bugs.

When ctx.ops.get_mut(*idx) returns None, the code silently continues. This could occur if an index was unexpectedly removed, potentially indicating a bug. Consider at minimum logging this case, or tracking it to ensure the completion check still works correctly.

The current logic relies on check_slab_entry returning true when all entries are None, which works but makes debugging harder if entries disappear unexpectedly.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tokio/src/runtime/driver/op.rs` around lines 212 - 217, The loop over ids
currently silently continues when ctx.ops.get_mut(*idx) returns None; update the
logic in the for loop that iterates through ids (the block using ctx.ops.get_mut
and lifecycle) to record or log missing indices instead of silently
skipping—e.g., push missing idx into a local Vec or call a debug/error log on
the missing entry—so missing entries are visible during debugging and so the
later check_slab_entry invocation can account for these missing indices; ensure
any new collection or logging uses existing context (ctx, ids) and does not
change the final completion semantics (check_slab_entry should still be
consulted to determine overall completion).

58-67: The completed array is always allocated at full size regardless of operation type.

For single operations (Op::new), the completed: [i32; N] array with N = DEFAULT_BATCH_SIZE (255) is allocated but never used. This wastes ~1KB per single operation.

Consider using an enum for the completion storage or making single operations use N = 1.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tokio/src/runtime/driver/op.rs` around lines 58 - 67, The completed array
currently always allocates [i32; N] inside Op<T, const N> causing wasted memory
for single operations; change the storage to be conditional by either (A) making
Op::new instantiate Op with N = 1 for single-operation paths and keep the
generic N for batch paths, or (B) replace the completed: [i32; N] field with an
enum (e.g., Completed::Single(i32) | Completed::Batch(Box<[i32; N]>)) or a
heap-allocated boxed/small-array variant so single ops only store one i32;
update uses of completed (tests and methods on Op, especially any code in
functions named Op::new, methods accessing completed, and the State<N>
interactions) to handle both variants accordingly. Ensure constructors and drop
semantics are adjusted to construct the appropriate variant and all places that
iterate or index into completed handle the single vs batch case.
tokio/src/io/uring/read.rs (1)

146-148: Consider using MaybeUninit::array_assume_init when stable, or add safety comment.

The transmute_copy is correct here since all elements were initialized, but lacks a safety comment explaining why this is sound. Also, transmute_copy may leave the original array in an inconsistent state (though it's not used after).

Add safety documentation
+/// # Safety
+/// Caller must ensure all elements of `n_ops` have been initialized via `write()`.
 unsafe fn assume_init<const N: usize>(n_ops: [MaybeUninit<Entry>; N]) -> [Entry; N] {
+    // SAFETY: Caller guarantees all elements are initialized.
+    // transmute_copy is used because MaybeUninit::array_assume_init is unstable.
     unsafe { std::mem::transmute_copy(&n_ops) }
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tokio/src/io/uring/read.rs` around lines 146 - 148, The unsafe helper
function assume_init<const N: usize>(n_ops: [MaybeUninit<Entry>; N]) -> [Entry;
N] uses std::mem::transmute_copy without any safety documentation; either switch
to the stable MaybeUninit::array_assume_init API when available or replace
transmute_copy with a sound move-out pattern (e.g., reading the array with
ptr::read) and add a clear safety comment above assume_init explaining that
every element of n_ops is fully initialized, that we take ownership and will not
use n_ops afterwards, and why the chosen approach does not violate aliasing or
drop safety for Entry.
tokio/src/runtime/io/driver/uring.rs (1)

121-124: The check_slab_entry logic is inverted.

The function comment says "check if the specified range of slab indexes exist or not" but the implementation returns true when all entries are None (i.e., don't exist). The function name check_slab_entry combined with returning true when entries are absent is confusing. Based on usage in op.rs line 248 (ctx.check_slab_entry(&*ids) to determine batch completion), this appears intentional—checking that all operations have been removed from the slab—but the naming is misleading.

Consider renaming to all_entries_removed or are_all_completed to clarify intent.

Suggested rename for clarity
-    // check if the specified range of slab indexes exist or not
-    pub(crate) fn check_slab_entry(&self, indexes: &[usize]) -> bool {
+    // Returns true if all indexed operations have been removed from the slab (i.e., completed)
+    pub(crate) fn all_entries_removed(&self, indexes: &[usize]) -> bool {
         indexes.iter().all(|i| self.ops.get(*i).is_none())
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tokio/src/runtime/io/driver/uring.rs` around lines 121 - 124, The method
check_slab_entry has inverted semantics (returns true when all slab slots are
None); rename it to a clearer name like all_entries_removed or
are_all_completed, update its doc comment to state it returns true when every
specified index is empty, and update every call site (e.g., the call in op.rs
that does ctx.check_slab_entry(&*ids)) to use the new name so the intent
(checking that all operations have been removed/completed) is clear.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@tokio/src/fs/read_uring.rs`:
- Around line 17-20: The comment on MAX_READ_SIZE incorrectly states 64 MiB
while the value 64 * 1024 equals 64 KiB; update the comment to reflect 64 KiB
(or change the constant to 64 * 1024 * 1024 if the intent was 64 MiB).
Specifically, modify the doc line "Set to read max 64 MiB at time" to "Set to
read max 64 KiB at a time" (or adjust the constant if you want MiB) while
keeping the SAFETY note about u32::MAX consistent with the chosen value for
MAX_READ_SIZE.

In `@tokio/src/io/uring/read.rs`:
- Around line 91-118: The loop currently skips assigning the current iteration's
op when array_idx == 0 && index != 0 because the code submits the previous batch
in the if branch and uses else for assignment; change the control flow so the
batch submission (unsafe { Op::batch(n_ops.clone(), Read { fd, buf }) } +
uring_task(...)) happens first when array_idx == 0 && index != 0 but does NOT
use else — after submitting, continue to construct the Read opcode
(opcode::Read::new(...).offset(...).build().flags(Flags::IO_LINK)) and assign it
into n_ops[array_idx]; keep updating last_len = array_idx as before so the final
batch handling still works. This ensures the first entry of each subsequent
batch is populated instead of skipped.

---

Nitpick comments:
In `@tokio/src/io/uring/read.rs`:
- Around line 146-148: The unsafe helper function assume_init<const N:
usize>(n_ops: [MaybeUninit<Entry>; N]) -> [Entry; N] uses
std::mem::transmute_copy without any safety documentation; either switch to the
stable MaybeUninit::array_assume_init API when available or replace
transmute_copy with a sound move-out pattern (e.g., reading the array with
ptr::read) and add a clear safety comment above assume_init explaining that
every element of n_ops is fully initialized, that we take ownership and will not
use n_ops afterwards, and why the chosen approach does not violate aliasing or
drop safety for Entry.

In `@tokio/src/runtime/driver/op.rs`:
- Around line 212-217: The loop over ids currently silently continues when
ctx.ops.get_mut(*idx) returns None; update the logic in the for loop that
iterates through ids (the block using ctx.ops.get_mut and lifecycle) to record
or log missing indices instead of silently skipping—e.g., push missing idx into
a local Vec or call a debug/error log on the missing entry—so missing entries
are visible during debugging and so the later check_slab_entry invocation can
account for these missing indices; ensure any new collection or logging uses
existing context (ctx, ids) and does not change the final completion semantics
(check_slab_entry should still be consulted to determine overall completion).
- Around line 58-67: The completed array currently always allocates [i32; N]
inside Op<T, const N> causing wasted memory for single operations; change the
storage to be conditional by either (A) making Op::new instantiate Op with N = 1
for single-operation paths and keep the generic N for batch paths, or (B)
replace the completed: [i32; N] field with an enum (e.g., Completed::Single(i32)
| Completed::Batch(Box<[i32; N]>)) or a heap-allocated boxed/small-array variant
so single ops only store one i32; update uses of completed (tests and methods on
Op, especially any code in functions named Op::new, methods accessing completed,
and the State<N> interactions) to handle both variants accordingly. Ensure
constructors and drop semantics are adjusted to construct the appropriate
variant and all places that iterate or index into completed handle the single vs
batch case.

In `@tokio/src/runtime/io/driver/uring.rs`:
- Around line 121-124: The method check_slab_entry has inverted semantics
(returns true when all slab slots are None); rename it to a clearer name like
all_entries_removed or are_all_completed, update its doc comment to state it
returns true when every specified index is empty, and update every call site
(e.g., the call in op.rs that does ctx.check_slab_entry(&*ids)) to use the new
name so the intent (checking that all operations have been removed/completed) is
clear.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 65646d5e-6ddb-415e-aa4c-616bdfdcd7bb

📥 Commits

Reviewing files that changed from the base of the PR and between a0ba4e8 and 5ac412a.

📒 Files selected for processing (6)
  • tokio/src/fs/read_uring.rs
  • tokio/src/io/uring/open.rs
  • tokio/src/io/uring/read.rs
  • tokio/src/io/uring/write.rs
  • tokio/src/runtime/driver/op.rs
  • tokio/src/runtime/io/driver/uring.rs

Comment on lines 17 to +20
// Max bytes we can read using io uring submission at a time
// SAFETY: cannot be higher than u32::MAX for safe cast
// Set to read max 64 MiB at time
const MAX_READ_SIZE: usize = 64 * 1024 * 1024;
pub(crate) const MAX_READ_SIZE: usize = 64 * 1024;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Comment/value mismatch: MAX_READ_SIZE is 64 KiB, not 64 MiB.

The comment says "Set to read max 64 MiB at time" but 64 * 1024 = 65,536 bytes = 64 KiB.

Fix comment
 // Max bytes we can read using io uring submission at a time
 // SAFETY: cannot be higher than u32::MAX for safe cast
-// Set to read max 64 MiB at time
+// Set to read max 64 KiB at a time
 pub(crate) const MAX_READ_SIZE: usize = 64 * 1024;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tokio/src/fs/read_uring.rs` around lines 17 - 20, The comment on
MAX_READ_SIZE incorrectly states 64 MiB while the value 64 * 1024 equals 64 KiB;
update the comment to reflect 64 KiB (or change the constant to 64 * 1024 * 1024
if the intent was 64 MiB). Specifically, modify the doc line "Set to read max 64
MiB at time" to "Set to read max 64 KiB at a time" (or adjust the constant if
you want MiB) while keeping the SAFETY note about u32::MAX consistent with the
chosen value for MAX_READ_SIZE.

Comment on lines +91 to +118
for (index, start) in (0..len).step_by(MAX_READ_SIZE).enumerate() {
let end = (start + MAX_READ_SIZE).min(len); // clamp to len for the final chunk
let array_idx = index % N;

// skip first iteration
if array_idx == 0 && index != 0 {
// SAFETY: Batches are valid array entries
let op = unsafe { Op::batch(n_ops.clone(), Read { fd, buf }) };
let (_, r_fd, r_buf) = uring_task(op, &mut read_len).await?;

fd = r_fd;
buf = r_buf;
} else {
let op = opcode::Read::new(
types::Fd(fd.as_raw_fd()),
buf.spare_capacity_mut()[start..].as_mut_ptr().cast(),
(end - start) as u32,
)
.offset(start as u64)
.build()
// link our sqes so cqes arrive in order
.flags(Flags::IO_LINK);

n_ops[array_idx] = op;
}

last_len = array_idx; // save last array_idx for last batch
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Off-by-one issue: First entry of subsequent batches is skipped.

When array_idx == 0 && index != 0, the code submits the current n_ops batch before setting n_ops[0] for the current iteration. This means:

  1. The first batch correctly gets entries at indices 0 to N-1
  2. When starting the second batch (index == N), array_idx is 0, so it submits the previous batch (correct), but then the current read operation for index == N is never added to n_ops—the loop continues to the next iteration

The logic should either:

  • Submit the batch, then fall through to set n_ops[array_idx], or
  • Restructure to submit after the array is filled
Suggested fix
         for (index, start) in (0..len).step_by(MAX_READ_SIZE).enumerate() {
             let end = (start + MAX_READ_SIZE).min(len); // clamp to len for the final chunk
             let array_idx = index % N;

             // skip first iteration
             if array_idx == 0 && index != 0 {
                 // SAFETY: Batches are valid array entries
                 let op = unsafe { Op::batch(n_ops.clone(), Read { fd, buf }) };
                 let (_, r_fd, r_buf) = uring_task(op, &mut read_len).await?;

                 fd = r_fd;
                 buf = r_buf;
-            } else {
-                let op = opcode::Read::new(
-                    types::Fd(fd.as_raw_fd()),
-                    buf.spare_capacity_mut()[start..].as_mut_ptr().cast(),
-                    (end - start) as u32,
-                )
-                .offset(start as u64)
-                .build()
-                // link our sqes so cqes arrive in order
-                .flags(Flags::IO_LINK);
-
-                n_ops[array_idx] = op;
             }
+
+            let op = opcode::Read::new(
+                types::Fd(fd.as_raw_fd()),
+                buf.spare_capacity_mut()[start..].as_mut_ptr().cast(),
+                (end - start) as u32,
+            )
+            .offset(start as u64)
+            .build()
+            // link our sqes so cqes arrive in order
+            .flags(Flags::IO_LINK);
+
+            n_ops[array_idx] = op;

             last_len = array_idx; // save last array_idx for last batch
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for (index, start) in (0..len).step_by(MAX_READ_SIZE).enumerate() {
let end = (start + MAX_READ_SIZE).min(len); // clamp to len for the final chunk
let array_idx = index % N;
// skip first iteration
if array_idx == 0 && index != 0 {
// SAFETY: Batches are valid array entries
let op = unsafe { Op::batch(n_ops.clone(), Read { fd, buf }) };
let (_, r_fd, r_buf) = uring_task(op, &mut read_len).await?;
fd = r_fd;
buf = r_buf;
} else {
let op = opcode::Read::new(
types::Fd(fd.as_raw_fd()),
buf.spare_capacity_mut()[start..].as_mut_ptr().cast(),
(end - start) as u32,
)
.offset(start as u64)
.build()
// link our sqes so cqes arrive in order
.flags(Flags::IO_LINK);
n_ops[array_idx] = op;
}
last_len = array_idx; // save last array_idx for last batch
}
for (index, start) in (0..len).step_by(MAX_READ_SIZE).enumerate() {
let end = (start + MAX_READ_SIZE).min(len); // clamp to len for the final chunk
let array_idx = index % N;
// skip first iteration
if array_idx == 0 && index != 0 {
// SAFETY: Batches are valid array entries
let op = unsafe { Op::batch(n_ops.clone(), Read { fd, buf }) };
let (_, r_fd, r_buf) = uring_task(op, &mut read_len).await?;
fd = r_fd;
buf = r_buf;
}
let op = opcode::Read::new(
types::Fd(fd.as_raw_fd()),
buf.spare_capacity_mut()[start..].as_mut_ptr().cast(),
(end - start) as u32,
)
.offset(start as u64)
.build()
// link our sqes so cqes arrive in order
.flags(Flags::IO_LINK);
n_ops[array_idx] = op;
last_len = array_idx; // save last array_idx for last batch
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tokio/src/io/uring/read.rs` around lines 91 - 118, The loop currently skips
assigning the current iteration's op when array_idx == 0 && index != 0 because
the code submits the previous batch in the if branch and uses else for
assignment; change the control flow so the batch submission (unsafe {
Op::batch(n_ops.clone(), Read { fd, buf }) } + uring_task(...)) happens first
when array_idx == 0 && index != 0 but does NOT use else — after submitting,
continue to construct the Read opcode
(opcode::Read::new(...).offset(...).build().flags(Flags::IO_LINK)) and assign it
into n_ops[array_idx]; keep updating last_len = array_idx as before so the final
batch handling still works. This ensures the first entry of each subsequent
batch is populated instead of skipped.

@augmentcode
Copy link
Copy Markdown

augmentcode bot commented Mar 11, 2026

🤖 Augment PR Summary

Summary: Extends Tokio’s io-uring file read path to support submitting reads in batches.

Changes:

  • Introduces a batched read path in tokio::fs::read_uring that attempts io-uring batch submission for sufficiently large buffers, with a fallback to the existing single-op loop.
  • Makes MAX_READ_SIZE crate-visible and uses it as the chunk size for both single-op and batched reads.
  • Adds Op::batch / read_batch_size built on const-generics to submit arrays of SQEs and await a batch of CQE results.
  • Generalizes the driver Op future and Completable trait to carry either a single completion or a batched set of results via CqeResult.
  • Updates io-uring operations (Open, Read, Write) to handle the new CqeResult shape.
  • Adds driver support for batch registration (register_batch), batch cancellation, and tracking completion of multiple slab entries.

Technical Notes: Batch cancellation stores shared CancelData in an Arc so multiple in-flight SQEs can keep the same buffers alive until completion.

🤖 Was this summary useful? React with 👍 or 👎

Copy link
Copy Markdown

@augmentcode augmentcode bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review completed. 3 suggestions posted.

Fix All in Augment

Comment augment review to trigger a new review at any time.

let array_idx = index % N;

// skip first iteration
if array_idx == 0 && index != 0 {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When array_idx == 0 && index != 0, this branch submits the previous batch but doesn't enqueue a read for the current start, so every Nth chunk is skipped and n_ops[0] may remain a stale entry for later submissions. That can truncate or corrupt the returned buffer for files that span more than N chunks.

Severity: high

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

}
None => return Err((r_fd, r_buf)),
},
Err(e) => {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In uring_task, ErrorKind::Interrupted results are currently ignored, which can leave unread holes in the buffer while still returning Ok(buf). Consider treating Interrupted as a retry/fallback condition (similar to op_read) to avoid returning partial/corrupt data.

Severity: medium

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.


// Max bytes we can read using io uring submission at a time
// SAFETY: cannot be higher than u32::MAX for safe cast
// Set to read max 64 MiB at time
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says "max 64 MiB" but MAX_READ_SIZE is now 64 * 1024 (64 KiB); can we confirm whether the constant or the comment should be updated? This value also changes batching eligibility (read_to_end_batch) and the per-op chunk size in read_to_end.

Severity: low

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Copy Markdown

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 3 potential issues.

Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

.flags(Flags::IO_LINK);

n_ops[array_idx] = op;
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Batch loop skips every Nth chunk causing data loss

High Severity

When array_idx == 0 && index != 0, the if branch submits the previous batch but the else branch that creates the read op for the current chunk is skipped entirely. This means every chunk at a batch boundary (indices N, 2N, 3N…) is silently dropped. Additionally, n_ops[0] retains the stale entry from the previous batch, causing chunk 0 to be re-read in subsequent batches. When total chunks equals kN+1, last_len ends as 0, so the final if last_len > 0 guard prevents submission of the last chunk too.

Additional Locations (1)
Fix in Cursor Fix in Web

// Per operation data.
data: Option<T>,
// Completed CQEs stored for checking batch completion
completed: [i32; N],
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default batch size bloats every single Op's stack

Medium Severity

DEFAULT_BATCH_SIZE is 255, making every Op<T> (including single operations like Op<Open>, Op<Write>, Op<Read>) carry State<255> (whose largest variant InitializeBatch([squeue::Entry; 255]) is ~16KB) and completed: [i32; 255] (~1KB). This means a simple single file open or write allocates ~17KB on the stack, all of which is unused for non-batch operations.

Fix in Cursor Fix in Web

}

Poll::Pending
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slab index reuse corrupts unrelated operations between polls

Medium Severity

In PolledBatch, completed entries are removed from the slab via ctx.remove_op(*idx). Between polls (when the lock is released), newly registered operations can reuse those freed slab indices. On the next poll, the batch handler iterates all original ids, finds a reused slot via ctx.ops.get_mut(*idx), and either overwrites the new operation's waker or steals its CQE—silently corrupting an unrelated in-flight operation.

Fix in Cursor Fix in Web

@claude
Copy link
Copy Markdown

claude bot commented Mar 11, 2026

PR Review: io-uring batching support. Thanks for this PR! The overall structure is a reasonable approach to batching. However, there are several correctness and safety issues. CRITICAL: (1) Off-by-one in read_batch_size: when index==N the entry is never built, n_ops[0] holds stale data, second batch re-reads offset=0 instead of N*MAX_READ_SIZE. (2) EINTR handling silently drops data: with IO_LINK subsequent entries get -ECANCELED, read_len is short, buf.set_len sets incorrect length (unsound). HIGH: (3) DEFAULT_BATCH_SIZE=255 bloats all Op stack frames with 3+ KiB of unused data. (4) unsafe fn batch() missing Safety doc comment. (5) register_batch panics instead of calling check_and_init. MEDIUM: (6) Stale comment on MAX_READ_SIZE says 64MiB but is 64KiB. (7) n_ops.clone() in hot loop copies 3.2KiB unnecessarily. (8) Batch path wastefully triggered for near-single-chunk files. NITS: typos (completly, Initalization), misleading comment on skip first iteration, unnecessary Arc in Lifecycle::Cancelled, no tests for batching logic.

@claude
Copy link
Copy Markdown

claude bot commented Mar 11, 2026

PR Review: io-uring batching support (full review)

CRITICAL BUG 1: Off-by-one in read_batch_size

When index == N, array_idx == 0 so the batch is submitted but the entry for index == N is NEVER BUILT. n_ops[0] retains the stale entry from index == 0 of the first batch. The second batch re-reads at offset 0 instead of N * MAX_READ_SIZE. Every subsequent batch has the same problem. Fix: build the entry before deciding to submit.

CRITICAL BUG 2: EINTR in linked batches silently drops data

In uring_task, EINTR falls through without updating read_len. With IO_LINK, an interrupted entry causes subsequent entries to be cancelled with -ECANCELED. The code swallows both, leaving read_len short. buf.set_len(buf.len() + read_len) then sets an incorrect length — unsound. There is no retry unlike the single-op path.

HIGH: DEFAULT_BATCH_SIZE = 255 bloats all Op stack frames

Op now carries completed:[i32;255] (~1 KiB) and State::PolledBatch([usize;255]) (~2 KiB) even for Op/Op. Every io-uring op wastes 3+ KiB of stack it never needs. Use a smaller default or split Op into single/batch types.

HIGH: unsafe fn batch() has no Safety doc comment — same buffer-lifetime requirements as Op::new but undocumented.

HIGH: register_batch panics instead of falling back — uses assert!(uring_probe.initialized()) while register_op uses check_and_init. Will panic if called before io_uring is set up.

MEDIUM: MAX_READ_SIZE comment says 64 MiB but constant is now 64 KiB.

MEDIUM: n_ops.clone() in the hot loop copies ~3.2 KiB per batch boundary unnecessarily.

MEDIUM: Near-single-chunk files trigger batch path with 198 Nop entries.

Nits: completly->completely (read.rs:90); Initalization->Initialization (op.rs:176, 196); skip-first-iteration comment is wrong — runs every Nth iteration; Arc wrapping in Lifecycle::Cancelled adds allocation overhead to single-op cancellations; no tests for the new batching logic.

@gemini-code-assist
Copy link
Copy Markdown

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request integrates batching functionality into Tokio's io_uring implementation, primarily targeting file read operations. The core changes involve adapting the underlying Op and Completable structures to gracefully manage multiple asynchronous I/O completions at once. This enhancement aims to boost performance for I/O-bound tasks by reducing the overhead associated with individual submission and completion events, making the io_uring backend more efficient for high-throughput scenarios.

Highlights

  • Batching Support for io_uring Reads: Introduced the capability to perform batched read operations using io_uring, significantly improving efficiency for large file reads by submitting multiple requests simultaneously.
  • Refactored Operation Completion Handling: Modified the Op and Completable traits to handle both single and batched completion queue entries (CQEs) through a new CqeResult enum, allowing for more flexible and robust result processing.
  • Conditional Batched File Reading: The read_uring function now intelligently decides whether to use batched reads based on the buffer's capacity, falling back to traditional single reads if batching is not beneficial or encounters issues.
  • Enhanced Error and Cancellation Management: Updated error handling to differentiate between single, batch, and initial errors, and extended cancellation mechanisms to properly manage batched io_uring operations.
Changelog
  • tokio/src/fs/read_uring.rs
    • Imported CqeResult and BATCH_SIZE constant.
    • Reduced MAX_READ_SIZE to 64KB for better batching granularity.
    • Introduced read_to_end_batch function to conditionally perform batched reads.
    • Modified op_read to handle the new CqeResult enum variants for completion.
  • tokio/src/io/uring/open.rs
    • Updated Completable implementation to process CqeResult enum variants, including Batch and InitErr.
  • tokio/src/io/uring/read.rs
    • Imported MAX_READ_SIZE and i32_to_result for error conversion.
    • Modified Completable trait to be generic over a constant N for batch size.
    • Implemented read_batch_size for submitting multiple read operations in a batch.
    • Added uring_task helper function to poll and process results from batched operations.
  • tokio/src/io/uring/write.rs
    • Adjusted Write's Completable implementation to correctly handle the new CqeResult enum.
  • tokio/src/runtime/driver/op.rs
    • Introduced DEFAULT_BATCH_SIZE constant.
    • Modified Lifecycle enum to store Arc<CancelData> for cancellation.
    • Refactored State enum to include InitializeBatch and PolledBatch for batched operations.
    • Added a completed array to the Op struct to store results from batched CQEs.
    • Introduced Op::batch constructor for creating batched operations.
    • Updated Drop implementation for Op to correctly handle batched cancellations.
    • Refactored CqeResult into an enum (Single, Batch, InitErr) and made it generic over N.
    • Modified Completable trait to be generic over N and accept the new CqeResult enum.
    • Updated Future::poll implementation to manage InitializeBatch and PolledBatch states.
    • Added cqe_to_result and i32_to_result helper functions for result conversion.
  • tokio/src/runtime/io/driver/uring.rs
    • Reordered and added Arc import.
    • Added check_slab_entry function to verify the presence of slab entries for batched operations.
    • Implemented register_batch for submitting multiple io_uring entries.
    • Modified cancel_op to use Arc<CancelData> for cancellation data.
    • Added cancel_batched_op for cancelling multiple operations in a batch.
Activity
  • The pull request was submitted by martin-augment.
  • The author explicitly requested an AI review in the description.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces batching for io-uring read operations, aiming for performance enhancement. However, the implementation of read_batch_size in tokio/src/io/uring/read.rs contains critical logic errors that lead to uninitialized memory exposure and potential out-of-bounds access. Specifically, the batching loop skips operations at certain indices, creating gaps in the buffer that are never written to by the kernel but are included in the final buffer length, leading to an information leak. Additionally, the indexing into the buffer's spare capacity is incorrect when the buffer is not empty. There's also an opportunity to enhance code safety by replacing a manual unsafe operation with a safer standard library equivalent, and a minor issue with a misleading code comment. Addressing these issues is crucial before merging.

Comment on lines +91 to +118
for (index, start) in (0..len).step_by(MAX_READ_SIZE).enumerate() {
let end = (start + MAX_READ_SIZE).min(len); // clamp to len for the final chunk
let array_idx = index % N;

// skip first iteration
if array_idx == 0 && index != 0 {
// SAFETY: Batches are valid array entries
let op = unsafe { Op::batch(n_ops.clone(), Read { fd, buf }) };
let (_, r_fd, r_buf) = uring_task(op, &mut read_len).await?;

fd = r_fd;
buf = r_buf;
} else {
let op = opcode::Read::new(
types::Fd(fd.as_raw_fd()),
buf.spare_capacity_mut()[start..].as_mut_ptr().cast(),
(end - start) as u32,
)
.offset(start as u64)
.build()
// link our sqes so cqes arrive in order
.flags(Flags::IO_LINK);

n_ops[array_idx] = op;
}

last_len = array_idx; // save last array_idx for last batch
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-critical critical

The batching logic in read_batch_size is flawed, leading to a critical information leak vulnerability. When a full batch is submitted (array_idx == 0 && index != 0), the else branch is skipped, meaning the opcode::Read for that iteration is not created and not added to n_ops. This effectively skips a chunk of the file and creates gaps in the buffer that are never written to by the kernel. Since read_len is updated with the total bytes read by the submitted operations, and buf.set_len is called at the end, the buffer's length will include these uninitialized gaps, exposing potentially sensitive data.

            if array_idx == 0 && index != 0 {
                // SAFETY: Batches are valid array entries
                let op = unsafe { Op::batch(n_ops.clone(), Read { fd, buf }) };
                let (_, r_fd, r_buf) = uring_task(op, &mut read_len).await?;

                fd = r_fd;
                buf = r_buf;
            }
            let op = opcode::Read::new(
                types::Fd(fd.as_raw_fd()),
                buf.spare_capacity_mut()[start..].as_mut_ptr().cast(),
                (end - start) as u32,
            )
            .offset(start as u64)
            .build()
            // link our sqes so cqes arrive in order
            .flags(Flags::IO_LINK);

            n_ops[array_idx] = op;

} else {
let op = opcode::Read::new(
types::Fd(fd.as_raw_fd()),
buf.spare_capacity_mut()[start..].as_mut_ptr().cast(),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

The index start is calculated based on the buffer's capacity, but it is used to index into the buffer's spare capacity. If the buffer is not empty, buf.spare_capacity_mut()[start..] will eventually attempt to access an index out of bounds of the spare capacity, leading to a panic. This is because spare_capacity_mut().len() is capacity - len, and start can go up to capacity.


// Max bytes we can read using io uring submission at a time
// SAFETY: cannot be higher than u32::MAX for safe cast
// Set to read max 64 MiB at time
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The comment indicates a read size of 64 MiB, but the constant MAX_READ_SIZE is set to 64 KiB. The comment should be updated to reflect the actual value to avoid confusion.

Suggested change
// Set to read max 64 MiB at time
// Set to read max 64 KiB at time

n_ops[i].write(opcode::Nop::new().build());
}

let mut n_ops = unsafe { assume_init(n_ops) };
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The custom assume_init function uses unsafe transmute_copy. A safer, idiomatic way to initialize the array is by using the safe MaybeUninit::array_assume_init function, which is stable since Rust 1.63. This avoids the need for a custom unsafe helper function.

You can remove the assume_init function entirely with this change.

        let mut n_ops = MaybeUninit::array_assume_init(n_ops);

Repository owner deleted a comment from claude bot Mar 11, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants