From ca7d44191d36a85999a23cfd59ad68892ceb735b Mon Sep 17 00:00:00 2001 From: Alan Somers Date: Fri, 27 Mar 2026 17:26:37 -0600 Subject: [PATCH 1/4] io: AioSource now employs IO Safety --- tokio/Cargo.toml | 8 ++++---- tokio/src/io/bsd/poll_aio.rs | 9 ++++----- tokio/tests/io_async_fd.rs | 9 +++++---- tokio/tests/io_async_fd_memory_leak.rs | 9 +++++---- tokio/tests/io_poll_aio.rs | 14 +++++++------- tokio/tests/net_unix_pipe.rs | 6 +++--- 6 files changed, 28 insertions(+), 27 deletions(-) diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index debfc0a047d..a90d8de7c67 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -96,7 +96,7 @@ pin-project-lite = "0.2.11" # Everything else is optional... bytes = { version = "1.2.1", optional = true } -mio = { version = "1.0.1", optional = true, default-features = false } +mio = { version = "1.2.0", optional = true, default-features = false } parking_lot = { version = "0.12.0", optional = true } [target.'cfg(not(target_family = "wasm"))'.dependencies] @@ -112,7 +112,7 @@ tracing = { version = "0.1.29", default-features = false, features = ["std"], op [target.'cfg(all(tokio_unstable, target_os = "linux"))'.dependencies] io-uring = { version = "0.7.11", default-features = false, optional = true } libc = { version = "0.2.168", optional = true } -mio = { version = "1.0.1", default-features = false, features = ["os-poll", "os-ext"], optional = true } +mio = { version = "1.2.0", default-features = false, features = ["os-poll", "os-ext"], optional = true } slab = { version = "0.4.9", optional = true } backtrace = { version = "0.3.58", optional = true } @@ -122,7 +122,7 @@ signal-hook-registry = { version = "1.1.1", optional = true } [target.'cfg(unix)'.dev-dependencies] libc = { version = "0.2.168" } -nix = { version = "0.29.0", default-features = false, features = ["aio", "fs", "socket"] } +nix = { version = "0.31.0", default-features = false, features = ["aio", "fs", "socket"] } [target.'cfg(windows)'.dependencies.windows-sys] version = "0.61" @@ -157,7 +157,7 @@ rand = "0.9" wasm-bindgen-test = "0.3.0" [target.'cfg(target_os = "freebsd")'.dev-dependencies] -mio-aio = { version = "1", features = ["tokio"] } +mio-aio = { version = "2", features = ["tokio"] } [target.'cfg(loom)'.dev-dependencies] loom = { version = "0.7", features = ["futures", "checkpoint"] } diff --git a/tokio/src/io/bsd/poll_aio.rs b/tokio/src/io/bsd/poll_aio.rs index 086ba6d93bc..834475e500d 100644 --- a/tokio/src/io/bsd/poll_aio.rs +++ b/tokio/src/io/bsd/poll_aio.rs @@ -9,8 +9,7 @@ use mio::Token; use std::fmt; use std::io; use std::ops::{Deref, DerefMut}; -use std::os::unix::io::AsRawFd; -use std::os::unix::prelude::RawFd; +use std::os::fd::{AsFd, BorrowedFd}; use std::task::{ready, Context, Poll}; /// Like [`mio::event::Source`], but for POSIX AIO only. @@ -19,7 +18,7 @@ use std::task::{ready, Context, Poll}; /// [`Aio`] object. pub trait AioSource { /// Registers this AIO event source with Tokio's reactor. - fn register(&mut self, kq: RawFd, token: usize); + fn register(&mut self, kq: BorrowedFd<'_>, token: usize); /// Deregisters this AIO event source with Tokio's reactor. fn deregister(&mut self); @@ -37,7 +36,7 @@ impl Source for MioSource { interests: mio::Interest, ) -> io::Result<()> { assert!(interests.is_aio() || interests.is_lio()); - self.0.register(registry.as_raw_fd(), usize::from(token)); + self.0.register(registry.as_fd(), usize::from(token)); Ok(()) } @@ -53,7 +52,7 @@ impl Source for MioSource { interests: mio::Interest, ) -> io::Result<()> { assert!(interests.is_aio() || interests.is_lio()); - self.0.register(registry.as_raw_fd(), usize::from(token)); + self.0.register(registry.as_fd(), usize::from(token)); Ok(()) } } diff --git a/tokio/tests/io_async_fd.rs b/tokio/tests/io_async_fd.rs index c9a7302d3fc..e94de36f61f 100644 --- a/tokio/tests/io_async_fd.rs +++ b/tokio/tests/io_async_fd.rs @@ -10,6 +10,7 @@ use std::time::Duration; use std::{ future::Future, io::{self, ErrorKind, Read, Write}, + os::fd::OwnedFd, task::{Context, Waker}, }; @@ -69,7 +70,7 @@ impl AsRawFd for FileDescriptor { impl Read for &FileDescriptor { fn read(&mut self, buf: &mut [u8]) -> io::Result { - read(self.fd.as_raw_fd(), buf).map_err(io::Error::from) + read(&self.fd, buf).map_err(io::Error::from) } } @@ -99,7 +100,7 @@ impl Write for FileDescriptor { } } -fn set_nonblocking(fd: RawFd) { +fn set_nonblocking(fd: &OwnedFd) { use nix::fcntl::{OFlag, F_GETFL, F_SETFL}; let flags = nix::fcntl::fcntl(fd, F_GETFL).expect("fcntl(F_GETFD)"); @@ -129,8 +130,8 @@ fn socketpair() -> (FileDescriptor, FileDescriptor) { .expect("socketpair"); let fds = (FileDescriptor { fd: fd_a }, FileDescriptor { fd: fd_b }); - set_nonblocking(fds.0.fd.as_raw_fd()); - set_nonblocking(fds.1.fd.as_raw_fd()); + set_nonblocking(&fds.0.fd); + set_nonblocking(&fds.1.fd); fds } diff --git a/tokio/tests/io_async_fd_memory_leak.rs b/tokio/tests/io_async_fd_memory_leak.rs index 62f368f41c5..fdf99238d13 100644 --- a/tokio/tests/io_async_fd_memory_leak.rs +++ b/tokio/tests/io_async_fd_memory_leak.rs @@ -61,6 +61,7 @@ fn allocated_bytes() -> usize { #[tokio::test] async fn memory_leak_when_fd_closed_before_drop() { use nix::sys::socket::{self, AddressFamily, SockFlag, SockType}; + use std::os::fd::OwnedFd; use std::os::unix::io::{AsRawFd, RawFd}; use std::sync::Arc; use tokio::io::unix::AsyncFd; @@ -83,7 +84,7 @@ async fn memory_leak_when_fd_closed_before_drop() { } } - fn set_nonblocking(fd: RawFd) { + fn set_nonblocking(fd: &OwnedFd) { use nix::fcntl::{OFlag, F_GETFL, F_SETFL}; let flags = nix::fcntl::fcntl(fd, F_GETFL).expect("fcntl(F_GETFL)"); @@ -119,7 +120,7 @@ async fn memory_leak_when_fd_closed_before_drop() { .unwrap(); let raw_fd = fd_a.as_raw_fd(); - set_nonblocking(raw_fd); + set_nonblocking(&fd_a); std::mem::forget(fd_a); let wrapper = Arc::new(RawFdWrapper { fd: raw_fd }); @@ -148,7 +149,7 @@ async fn memory_leak_when_fd_closed_before_drop() { .unwrap(); let raw_fd = fd_a.as_raw_fd(); - set_nonblocking(raw_fd); + set_nonblocking(&fd_a); std::mem::forget(fd_a); let wrapper = Arc::new(RawFdWrapper { fd: raw_fd }); @@ -175,7 +176,7 @@ async fn memory_leak_when_fd_closed_before_drop() { .unwrap(); let raw_fd = fd_a.as_raw_fd(); - set_nonblocking(raw_fd); + set_nonblocking(&fd_a); std::mem::forget(fd_a); let wrapper = Arc::new(RawFdWrapper { fd: raw_fd }); diff --git a/tokio/tests/io_poll_aio.rs b/tokio/tests/io_poll_aio.rs index 242887eb60f..45fbd9d3002 100644 --- a/tokio/tests/io_poll_aio.rs +++ b/tokio/tests/io_poll_aio.rs @@ -5,8 +5,8 @@ use mio_aio::{AioFsyncMode, SourceApi}; use std::{ future::Future, io, mem, - os::fd::AsFd, - os::unix::io::{AsRawFd, RawFd}, + os::fd::{AsFd, BorrowedFd}, + os::unix::io::AsRawFd, pin::{pin, Pin}, task::{Context, Poll}, }; @@ -21,7 +21,7 @@ mod aio { struct TokioSource<'fd>(mio_aio::Source>); impl<'fd> AioSource for TokioSource<'fd> { - fn register(&mut self, kq: RawFd, token: usize) { + fn register(&mut self, kq: BorrowedFd<'_>, token: usize) { self.0.register_raw(kq, token) } fn deregister(&mut self) { @@ -81,10 +81,10 @@ mod aio { } impl AioSource for LlSource { - fn register(&mut self, kq: RawFd, token: usize) { + fn register(&mut self, kq: BorrowedFd<'_>, token: usize) { let mut sev: libc::sigevent = unsafe { mem::MaybeUninit::zeroed().assume_init() }; sev.sigev_notify = libc::SIGEV_KEVENT; - sev.sigev_signo = kq; + sev.sigev_signo = kq.as_raw_fd(); sev.sigev_value = libc::sigval { sival_ptr: token as *mut libc::c_void, }; @@ -222,10 +222,10 @@ mod lio { } impl<'a> AioSource for LioSource<'a> { - fn register(&mut self, kq: RawFd, token: usize) { + fn register(&mut self, kq: BorrowedFd<'_>, token: usize) { let mut sev: libc::sigevent = unsafe { mem::MaybeUninit::zeroed().assume_init() }; sev.sigev_notify = libc::SIGEV_KEVENT; - sev.sigev_signo = kq; + sev.sigev_signo = kq.as_raw_fd(); sev.sigev_value = libc::sigval { sival_ptr: token as *mut libc::c_void, }; diff --git a/tokio/tests/net_unix_pipe.rs b/tokio/tests/net_unix_pipe.rs index f95eb19989d..c35b1c81fb5 100644 --- a/tokio/tests/net_unix_pipe.rs +++ b/tokio/tests/net_unix_pipe.rs @@ -8,8 +8,8 @@ use tokio_test::{assert_err, assert_ok, assert_pending, assert_ready_ok}; use std::fs::File; use std::io; +use std::os::fd::AsFd; use std::os::unix::fs::OpenOptionsExt; -use std::os::unix::io::AsRawFd; use std::path::{Path, PathBuf}; /// Helper struct which will clean up temporary files once dropped. @@ -277,8 +277,8 @@ async fn from_file_detects_wrong_access_mode() -> io::Result<()> { Ok(()) } -fn is_nonblocking(fd: &T) -> io::Result { - let flags = nix::fcntl::fcntl(fd.as_raw_fd(), nix::fcntl::F_GETFL)?; +fn is_nonblocking(fd: &T) -> io::Result { + let flags = nix::fcntl::fcntl(fd.as_fd(), nix::fcntl::F_GETFL)?; Ok((flags & libc::O_NONBLOCK) != 0) } From b19e713b5e9cf7346b1efab7407b4750e440b7c2 Mon Sep 17 00:00:00 2001 From: Alan Somers Date: Fri, 3 Apr 2026 16:38:51 -0600 Subject: [PATCH 2/4] squash: Provide backwards-compatibility with older AioSource implementors Older implementors will continue to work without IO Safety. But newer implementors should implement AioSource::register_borrowed instead of AioSource::register. --- tokio/src/io/bsd/poll_aio.rs | 28 ++++++++++++++++++++++++---- tokio/tests/io_poll_aio.rs | 6 +++--- 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/tokio/src/io/bsd/poll_aio.rs b/tokio/src/io/bsd/poll_aio.rs index 834475e500d..b1808f2cf22 100644 --- a/tokio/src/io/bsd/poll_aio.rs +++ b/tokio/src/io/bsd/poll_aio.rs @@ -10,15 +10,35 @@ use std::fmt; use std::io; use std::ops::{Deref, DerefMut}; use std::os::fd::{AsFd, BorrowedFd}; +use std::os::unix::io::{AsRawFd, RawFd}; use std::task::{ready, Context, Poll}; /// Like [`mio::event::Source`], but for POSIX AIO only. /// /// Tokio's consumer must pass an implementor of this trait to create a -/// [`Aio`] object. +/// [`Aio`] object. Implementors must implement at least one of [`AioSource::register`] and +/// [`AioSource::register_borrowed`]. pub trait AioSource { /// Registers this AIO event source with Tokio's reactor. - fn register(&mut self, kq: BorrowedFd<'_>, token: usize); + /// + /// # Safety + /// + /// It's memory-safe safe, but not I/O safe. If the file referenced by `kq` gets dropped, then + /// this source may end up notifying the wrong file. + #[deprecated(since = "1.51.0", note = "use register_borrowed instead")] + fn register(&mut self, _kq: RawFd, _token: usize) { + // This default implementation exists so new AioSource implementors that implement the + // register_borrowed method can compile without the need to implement register. + unimplemented!("Use AioSource::register_borrowed instead") + } + + /// Registers this AIO event source with Tokio's reactor. + fn register_borrowed(&mut self, kq: BorrowedFd<'_>, token: usize) { + // This default implementation serves to provide backwards compatibility with AioSource + // implementors written before 1.51.0 that only implemented the unsafe `register` method. + #[allow(deprecated)] + self.register(kq.as_raw_fd(), token) + } /// Deregisters this AIO event source with Tokio's reactor. fn deregister(&mut self); @@ -36,7 +56,7 @@ impl Source for MioSource { interests: mio::Interest, ) -> io::Result<()> { assert!(interests.is_aio() || interests.is_lio()); - self.0.register(registry.as_fd(), usize::from(token)); + self.0.register_borrowed(registry.as_fd(), usize::from(token)); Ok(()) } @@ -52,7 +72,7 @@ impl Source for MioSource { interests: mio::Interest, ) -> io::Result<()> { assert!(interests.is_aio() || interests.is_lio()); - self.0.register(registry.as_fd(), usize::from(token)); + self.0.register_borrowed(registry.as_fd(), usize::from(token)); Ok(()) } } diff --git a/tokio/tests/io_poll_aio.rs b/tokio/tests/io_poll_aio.rs index 45fbd9d3002..64d8bf71f84 100644 --- a/tokio/tests/io_poll_aio.rs +++ b/tokio/tests/io_poll_aio.rs @@ -21,7 +21,7 @@ mod aio { struct TokioSource<'fd>(mio_aio::Source>); impl<'fd> AioSource for TokioSource<'fd> { - fn register(&mut self, kq: BorrowedFd<'_>, token: usize) { + fn register_borrowed(&mut self, kq: BorrowedFd<'_>, token: usize) { self.0.register_raw(kq, token) } fn deregister(&mut self) { @@ -81,7 +81,7 @@ mod aio { } impl AioSource for LlSource { - fn register(&mut self, kq: BorrowedFd<'_>, token: usize) { + fn register_borrowed(&mut self, kq: BorrowedFd<'_>, token: usize) { let mut sev: libc::sigevent = unsafe { mem::MaybeUninit::zeroed().assume_init() }; sev.sigev_notify = libc::SIGEV_KEVENT; sev.sigev_signo = kq.as_raw_fd(); @@ -222,7 +222,7 @@ mod lio { } impl<'a> AioSource for LioSource<'a> { - fn register(&mut self, kq: BorrowedFd<'_>, token: usize) { + fn register_borrowed(&mut self, kq: BorrowedFd<'_>, token: usize) { let mut sev: libc::sigevent = unsafe { mem::MaybeUninit::zeroed().assume_init() }; sev.sigev_notify = libc::SIGEV_KEVENT; sev.sigev_signo = kq.as_raw_fd(); From e397fc427fa3009ef3ecfbea88c4208658df52ec Mon Sep 17 00:00:00 2001 From: Alan Somers Date: Fri, 3 Apr 2026 16:41:16 -0600 Subject: [PATCH 3/4] Error message spelling fixes --- tokio/tests/io_async_fd.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/tests/io_async_fd.rs b/tokio/tests/io_async_fd.rs index e94de36f61f..c35c8b6a23e 100644 --- a/tokio/tests/io_async_fd.rs +++ b/tokio/tests/io_async_fd.rs @@ -103,7 +103,7 @@ impl Write for FileDescriptor { fn set_nonblocking(fd: &OwnedFd) { use nix::fcntl::{OFlag, F_GETFL, F_SETFL}; - let flags = nix::fcntl::fcntl(fd, F_GETFL).expect("fcntl(F_GETFD)"); + let flags = nix::fcntl::fcntl(fd, F_GETFL).expect("fcntl(F_GETFL)"); if flags < 0 { panic!( @@ -115,7 +115,7 @@ fn set_nonblocking(fd: &OwnedFd) { let flags = OFlag::from_bits_truncate(flags) | OFlag::O_NONBLOCK; - nix::fcntl::fcntl(fd, F_SETFL(flags)).expect("fcntl(F_SETFD)"); + nix::fcntl::fcntl(fd, F_SETFL(flags)).expect("fcntl(F_SETFL)"); } fn socketpair() -> (FileDescriptor, FileDescriptor) { From 5c1df99b5e8fb9b519eb75ac822ce963fa80396a Mon Sep 17 00:00:00 2001 From: Alan Somers Date: Fri, 3 Apr 2026 16:59:55 -0600 Subject: [PATCH 4/4] fixup! squash: Provide backwards-compatibility with older AioSource implementors --- tokio/src/io/bsd/poll_aio.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tokio/src/io/bsd/poll_aio.rs b/tokio/src/io/bsd/poll_aio.rs index b1808f2cf22..cad27d6cebf 100644 --- a/tokio/src/io/bsd/poll_aio.rs +++ b/tokio/src/io/bsd/poll_aio.rs @@ -56,7 +56,8 @@ impl Source for MioSource { interests: mio::Interest, ) -> io::Result<()> { assert!(interests.is_aio() || interests.is_lio()); - self.0.register_borrowed(registry.as_fd(), usize::from(token)); + self.0 + .register_borrowed(registry.as_fd(), usize::from(token)); Ok(()) } @@ -72,7 +73,8 @@ impl Source for MioSource { interests: mio::Interest, ) -> io::Result<()> { assert!(interests.is_aio() || interests.is_lio()); - self.0.register_borrowed(registry.as_fd(), usize::from(token)); + self.0 + .register_borrowed(registry.as_fd(), usize::from(token)); Ok(()) } }