Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions tokio/src/runtime/io/driver/uring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ use io_uring::{squeue::Entry, IoUring, Probe};
use mio::unix::SourceFd;
use slab::Slab;

use crate::runtime::driver::op::CancelData;
use crate::runtime::driver::op::CqeResult;
use crate::runtime::driver::op::{Cancellable, Lifecycle};
use crate::{io::Interest, loom::sync::Mutex};

use super::{Handle, TOKEN_WAKEUP};

use std::os::fd::{AsRawFd, RawFd};
use std::os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd};
use std::{io, mem, task::Waker};

const DEFAULT_RING_SIZE: u32 = 256;
Expand Down Expand Up @@ -77,9 +79,14 @@ impl UringContext {
waker.wake_by_ref();
*ops.get_mut(idx).unwrap() = Lifecycle::Completed(cqe);
}
Some(Lifecycle::Cancelled(_)) => {
Some(Lifecycle::Cancelled(CancelData::Open(_))) => {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CancelData also has Read/Write variants; with this match only handling Cancelled(Open(_)), any cancelled read/write that later completes will now hit the Some(other) arm and panic the runtime. That seems like a functional regression from the previous Cancelled(_) handling (remove-and-discard).

Severity: high

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The Augment AI reviewer is correct! Before the proposed changes any cancelation will lead to removing the index from the list of operations. With the proposed changes only canceled Open operations are handled. Prevents leaking index operations for all but the Open operations.

if let Ok(fd) = CqeResult::from(cqe).result {
// SAFETY: the successful CQE result provides
// a non-negative integer, and the event is
// related to an open operation.
unsafe { OwnedFd::from_raw_fd(fd as i32) };
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cancelled Read/Write operations cause runtime panic

High Severity

The previous code matched all Cancelled(_) variants in dispatch_completions, gracefully removing them from the slab. The new code only matches Cancelled(CancelData::Open(_)), so Cancelled(CancelData::Read(_)) and Cancelled(CancelData::Write(_)) now fall through to the Some(other) arm which panics. Since Op::drop calls cancel_op for any polled Read or Write operation, this will crash the runtime when the kernel returns a CQE for a cancelled read or write.

Additional Locations (1)
Fix in Cursor Fix in Web

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The Bugbot AI reviewer is correct! Before the proposed changes any cancelation will lead to removing the index from the list of operations. With the proposed changes only canceled Open operations are handled. Prevents leaking index operations for all but the Open operations.

Comment on lines +82 to +88
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

This still misses the post-completion cancel path.

This only handles the case where the slot is already Cancelled(Open) when the CQE is dispatched. If the CQE lands first, cancel_op later sees Lifecycle::Completed(_) and removes the slot without consuming a successful open result, so the fd can still leak. Please reuse the same cleanup when discarding a completed Open CQE.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tokio/src/runtime/io/driver/uring.rs` around lines 82 - 88, The code only
drops the fd when the slot is already Lifecycle::Cancelled(CancelData::Open(_))
at CQE handling; you must also perform the same cleanup when cancel_op removes a
slot that is already Lifecycle::Completed with an Open result. Update the
cancel_op (and/or the branch that handles Lifecycle::Completed(_)) to detect
Completed(CancelData::Open(_)) and, if CqeResult::from(cqe).result is Ok(fd),
call the same unsafe cleanup (OwnedFd::from_raw_fd(fd as i32)) to consume and
drop the fd; reuse the same logic used in the existing
Some(Lifecycle::Cancelled(CancelData::Open(_))) branch so successful open
results cannot leak.

// Op future was cancelled, so we discard the result.
// We just remove the entry from the slab.
ops.remove(idx);
}
Comment on lines +82 to 91
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This change correctly handles closing file descriptors for cancelled open operations, but it introduces a regression. By matching only on Lifecycle::Cancelled(CancelData::Open(_)), any other cancelled operation (e.g., Read, Write) will now fall through to the Some(other) case and cause a panic. The previous implementation correctly handled all Cancelled variants.

To fix this, you should handle all Lifecycle::Cancelled variants in one arm and then check if it's an Open operation inside.

Additionally, using let _ = ... makes it more explicit that you are intentionally creating and dropping the OwnedFd to close the file descriptor.

                Some(Lifecycle::Cancelled(cancel_data)) => {
                    if matches!(cancel_data, CancelData::Open(_)) {
                        if let Ok(fd) = CqeResult::from(cqe).result {
                            // SAFETY: the successful CQE result provides
                            // a non-negative integer, and the event is
                            // related to an open operation.
                            let _ = unsafe { OwnedFd::from_raw_fd(fd as i32) };
                        }
                    }
                    // Op future was cancelled, so we discard the result.
                    ops.remove(idx);
                }

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The Gemini AI reviewer is correct! Before the proposed changes any cancelation will lead to removing the index from the list of operations. With the proposed changes only canceled Open operations are handled. Prevents leaking index operations for all but the Open operations.

Comment on lines +82 to 91
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Don't regress cancelled read/write ops into a panic.

tokio/src/runtime/driver/op.rs still defines CancelData::Read and CancelData::Write, so narrowing this arm to Cancelled(Open) makes those completions fall through to Some(other) and panic instead of being discarded.

Suggested fix
-                Some(Lifecycle::Cancelled(CancelData::Open(_))) => {
-                    if let Ok(fd) = CqeResult::from(cqe).result {
-                        // SAFETY: the successful CQE result provides
-                        // a non-negative integer, and the event is
-                        // related to an open operation.
-                        unsafe { OwnedFd::from_raw_fd(fd as i32) };
-                    }
+                Some(Lifecycle::Cancelled(cancel_data)) => {
+                    let is_open = matches!(cancel_data, CancelData::Open(_));
+                    if is_open {
+                        if let Ok(fd) = CqeResult::from(cqe).result {
+                            // SAFETY: the successful CQE result provides
+                            // a non-negative integer, and the event is
+                            // related to an open operation.
+                            drop(unsafe { OwnedFd::from_raw_fd(fd as i32) });
+                        }
+                    }
                     // Op future was cancelled, so we discard the result.
                     ops.remove(idx);
                 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tokio/src/runtime/io/driver/uring.rs` around lines 82 - 91, The match arm
currently only handles Some(Lifecycle::Cancelled(CancelData::Open(_))) which
lets CancelData::Read and CancelData::Write fall through and panic; change the
arm to handle any Lifecycle::Cancelled(_) so cancelled read/write ops are
discarded instead of panicking. Specifically, update the match from
Some(Lifecycle::Cancelled(CancelData::Open(_))) to
Some(Lifecycle::Cancelled(cancel_data)) (or use a wildcard
Some(Lifecycle::Cancelled(_))), then inside the block only call
CqeResult::from(cqe).result -> if it’s Ok(fd) and the cancel_data is
CancelData::Open(_) then SAFELY convert to OwnedFd::from_raw_fd(fd as i32); in
all cases call ops.remove(idx) to discard the cancelled operation.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The CodeRabbit AI reviewer is correct! Before the proposed changes any cancelation will lead to removing the index from the list of operations. With the proposed changes only canceled Open operations are handled. Prevents leaking index operations for all but the Open operations.

Some(other) => {
Expand Down
48 changes: 48 additions & 0 deletions tokio/tests/fs_uring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
))]

use futures::future::FutureExt;
use std::fs;
use std::sync::mpsc;
use std::task::Poll;
use std::time::Duration;
Expand Down Expand Up @@ -145,6 +146,53 @@ async fn cancel_op_future() {
assert!(res.is_cancelled());
}

// see: https://github.com/tokio-rs/tokio/issues/7979
#[tokio::test]
async fn file_descriptors_are_closed_when_cancelling_open_op() {
let (_tmp_file, path): (Vec<NamedTempFile>, Vec<PathBuf>) = create_tmp_files(1);

let fd_count_before_access = fs::read_dir("/proc/self/fd").unwrap().count();

for _ in 0..128 {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();

let path = path.clone();
let handle = tokio::spawn(async move {
poll_fn(|cx| {
let opt = {
let mut opt = tokio::fs::OpenOptions::new();
opt.read(true);
opt
};

let fut = opt.open(&path[0]);

// If io_uring is enabled (and not falling back to the thread pool),
// the first poll should return Pending.
let _pending = Box::pin(fut).poll_unpin(cx);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This stores the first poll result in _pending but never asserts it is actually Poll::Pending; if the runtime falls back (or the op completes synchronously), the test can pass without exercising the cancelled in-flight io_uring open case it’s meant to cover.

Severity: medium

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The Augment AI reviewer is correct! The result of poll_unpin() is just ignored assuming that it is Pending but it is not asserted. Prevents silently passing the test even when the result is not Pending


tx.send(()).unwrap();

Poll::<()>::Pending
})
.await;
});

// Wait for the first poll
rx.recv().await.unwrap();

handle.abort();

let res = handle.await.unwrap_err();
assert!(res.is_cancelled());
}

let fd_count_after_cancel = fs::read_dir("/proc/self/fd").unwrap().count();
let leaked = fd_count_after_cancel.saturating_sub(fd_count_before_access);

assert!(leaked <= 64);
Comment on lines +154 to +193
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

This regression can pass without ever exercising the bug.

poll_unpin is ignored, so a synchronous Ready (fallback / non-io_uring path) turns the loop into a no-op, and the final leaked <= 64 check runs before completions have necessarily drained. That both weakens coverage and makes the bound noisy. Assert that the first poll is Pending, then wait for the fd count to settle back to the baseline (or a tight, explained bound) before asserting.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tokio/tests/fs_uring.rs` around lines 154 - 193, The test currently ignores
the result of Box::pin(fut).poll_unpin(cx) so synchronous Ready cases bypass the
intended Pending path; change the poll site in the spawned task (the poll_fn
that calls OpenOptions::open / the Box::pin(fut).poll_unpin(cx)) to capture the
Poll result and assert it is Pending before sending on tx, ensuring the first
poll is actually Pending; after aborting the handle and awaiting cancellation,
poll / read the fd count in a short loop with a small sleep (or a bounded retry)
until it settles back to the baseline (or a documented tight bound) and only
then compute leaked and assert leaked <= 64.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The CodeRabbit AI reviewer is correct! The result of poll_unpin() is just ignored assuming that it is Pending but it is not asserted. Prevents silently passing the test even when the result is not Pending

}

fn create_tmp_files(num_files: usize) -> (Vec<NamedTempFile>, Vec<PathBuf>) {
let mut files = Vec::with_capacity(num_files);
for _ in 0..num_files {
Expand Down
Loading