diff --git a/tokio/src/runtime/io/driver/uring.rs b/tokio/src/runtime/io/driver/uring.rs index 89c97826bdf..53003c41f97 100644 --- a/tokio/src/runtime/io/driver/uring.rs +++ b/tokio/src/runtime/io/driver/uring.rs @@ -2,12 +2,14 @@ use io_uring::{squeue::Entry, IoUring, Probe}; use mio::unix::SourceFd; use slab::Slab; +use crate::runtime::driver::op::CancelData; +use crate::runtime::driver::op::CqeResult; use crate::runtime::driver::op::{Cancellable, Lifecycle}; use crate::{io::Interest, loom::sync::Mutex}; use super::{Handle, TOKEN_WAKEUP}; -use std::os::fd::{AsRawFd, RawFd}; +use std::os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd}; use std::{io, mem, task::Waker}; const DEFAULT_RING_SIZE: u32 = 256; @@ -77,9 +79,14 @@ impl UringContext { waker.wake_by_ref(); *ops.get_mut(idx).unwrap() = Lifecycle::Completed(cqe); } - Some(Lifecycle::Cancelled(_)) => { + Some(Lifecycle::Cancelled(CancelData::Open(_))) => { + if let Ok(fd) = CqeResult::from(cqe).result { + // SAFETY: the successful CQE result provides + // a non-negative integer, and the event is + // related to an open operation. + unsafe { OwnedFd::from_raw_fd(fd as i32) }; + } // Op future was cancelled, so we discard the result. - // We just remove the entry from the slab. ops.remove(idx); } Some(other) => { diff --git a/tokio/tests/fs_uring.rs b/tokio/tests/fs_uring.rs index cd0d207d278..4f985916921 100644 --- a/tokio/tests/fs_uring.rs +++ b/tokio/tests/fs_uring.rs @@ -9,6 +9,7 @@ ))] use futures::future::FutureExt; +use std::fs; use std::sync::mpsc; use std::task::Poll; use std::time::Duration; @@ -145,6 +146,53 @@ async fn cancel_op_future() { assert!(res.is_cancelled()); } +// see: https://github.com/tokio-rs/tokio/issues/7979 +#[tokio::test] +async fn file_descriptors_are_closed_when_cancelling_open_op() { + let (_tmp_file, path): (Vec, Vec) = create_tmp_files(1); + + let fd_count_before_access = fs::read_dir("/proc/self/fd").unwrap().count(); + + for _ in 0..128 { + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + + let path = path.clone(); + let handle = tokio::spawn(async move { + poll_fn(|cx| { + let opt = { + let mut opt = tokio::fs::OpenOptions::new(); + opt.read(true); + opt + }; + + let fut = opt.open(&path[0]); + + // If io_uring is enabled (and not falling back to the thread pool), + // the first poll should return Pending. + let _pending = Box::pin(fut).poll_unpin(cx); + + tx.send(()).unwrap(); + + Poll::<()>::Pending + }) + .await; + }); + + // Wait for the first poll + rx.recv().await.unwrap(); + + handle.abort(); + + let res = handle.await.unwrap_err(); + assert!(res.is_cancelled()); + } + + let fd_count_after_cancel = fs::read_dir("/proc/self/fd").unwrap().count(); + let leaked = fd_count_after_cancel.saturating_sub(fd_count_before_access); + + assert!(leaked <= 64); +} + fn create_tmp_files(num_files: usize) -> (Vec, Vec) { let mut files = Vec::with_capacity(num_files); for _ in 0..num_files {