Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ pin-project-lite = "0.2.11"

# Everything else is optional...
bytes = { version = "1.2.1", optional = true }
mio = { version = "1.0.1", optional = true, default-features = false }
mio = { version = "1.2.0", optional = true, default-features = false }
parking_lot = { version = "0.12.0", optional = true }

[target.'cfg(not(target_family = "wasm"))'.dependencies]
Expand All @@ -112,7 +112,7 @@ tracing = { version = "0.1.29", default-features = false, features = ["std"], op
[target.'cfg(all(tokio_unstable, target_os = "linux"))'.dependencies]
io-uring = { version = "0.7.11", default-features = false, optional = true }
libc = { version = "0.2.168", optional = true }
mio = { version = "1.0.1", default-features = false, features = ["os-poll", "os-ext"], optional = true }
mio = { version = "1.2.0", default-features = false, features = ["os-poll", "os-ext"], optional = true }
slab = { version = "0.4.9", optional = true }
backtrace = { version = "0.3.58", optional = true }

Expand All @@ -122,7 +122,7 @@ signal-hook-registry = { version = "1.1.1", optional = true }

[target.'cfg(unix)'.dev-dependencies]
libc = { version = "0.2.168" }
nix = { version = "0.29.0", default-features = false, features = ["aio", "fs", "socket"] }
nix = { version = "0.31.0", default-features = false, features = ["aio", "fs", "socket"] }

[target.'cfg(windows)'.dependencies.windows-sys]
version = "0.61"
Expand Down Expand Up @@ -157,7 +157,7 @@ rand = "0.9"
wasm-bindgen-test = "0.3.0"

[target.'cfg(target_os = "freebsd")'.dev-dependencies]
mio-aio = { version = "1", features = ["tokio"] }
mio-aio = { version = "2", features = ["tokio"] }

[target.'cfg(loom)'.dev-dependencies]
loom = { version = "0.7", features = ["futures", "checkpoint"] }
Expand Down
33 changes: 27 additions & 6 deletions tokio/src/io/bsd/poll_aio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,36 @@ use mio::Token;
use std::fmt;
use std::io;
use std::ops::{Deref, DerefMut};
use std::os::unix::io::AsRawFd;
use std::os::unix::prelude::RawFd;
use std::os::fd::{AsFd, BorrowedFd};
use std::os::unix::io::{AsRawFd, RawFd};
use std::task::{ready, Context, Poll};

/// Like [`mio::event::Source`], but for POSIX AIO only.
///
/// Tokio's consumer must pass an implementor of this trait to create a
/// [`Aio`] object.
/// [`Aio`] object. Implementors must implement at least one of [`AioSource::register`] and
/// [`AioSource::register_borrowed`].
pub trait AioSource {
/// Registers this AIO event source with Tokio's reactor.
fn register(&mut self, kq: RawFd, token: usize);
///
/// # Safety
///
/// It's memory-safe safe, but not I/O safe. If the file referenced by `kq` gets dropped, then
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There is a typo in the documentation: "memory-safe safe" should be "memory-safe".

Suggested change
/// It's memory-safe safe, but not I/O safe. If the file referenced by `kq` gets dropped, then
/// It's memory-safe, but not I/O safe. If the file referenced by `kq` gets dropped, then

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-to-have; category:documentation; feedback: The Gemini AI reviewer is correct! memory-safe safe sounds wrong. One of the occurrences of safe should be removed.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tokio/src/io/bsd/poll_aio.rs:26: Doc typo: “memory-safe safe” reads like an accidental duplication in the # Safety text.

Severity: low

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-to-have; category:documentation; feedback: The Augment AI reviewer is correct! memory-safe safe sounds wrong. One of the occurrences of safe should be removed.

/// this source may end up notifying the wrong file.
#[deprecated(since = "1.51.0", note = "use register_borrowed instead")]
fn register(&mut self, _kq: RawFd, _token: usize) {
// This default implementation exists so new AioSource implementors that implement the
// register_borrowed method can compile without the need to implement register.
unimplemented!("Use AioSource::register_borrowed instead")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tokio/src/io/bsd/poll_aio.rs:32: Because both register and register_borrowed have default implementations, an AioSource impl that forgets to override either will compile but panic at runtime when Tokio registers it (via this unimplemented!). Since AioSource is public, this is a fairly easy footgun for downstream implementors.

Severity: medium

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:incorrect-but-reasonable; category:bug; feedback: The Augment AI reviewer is not correct! The important function now is register_borrowed(). By default it just delegates to register(), so that all old implementors still work. They will see a deprecation warning but their build will still pass and the functionality will work at runtime. Any new implementors may not override neither of the functions because both of them have default implementations but they will quickly realize that they need to override register_borrowed() on the first attempt to use the function. Hopefully they will test the new implementation before using it in production.

}

/// Registers this AIO event source with Tokio's reactor.
fn register_borrowed(&mut self, kq: BorrowedFd<'_>, token: usize) {
// This default implementation serves to provide backwards compatibility with AioSource
// implementors written before 1.51.0 that only implemented the unsafe `register` method.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

register_borrowed’s compat comment says older implementors “only implemented the unsafe register method”, but register isn’t declared unsafe here (it’s deprecated, but still safe). Consider adjusting the wording so the docs don’t imply an unsafe contract that doesn’t exist.

Severity: low

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:incorrect-but-reasonable; category:bug; feedback: The Augment AI reviewer is not correct! Here unsafe is confused with Rust's unsafe keyword while it actually means that the usage of RawFD is more unsafe than using BorrowedFS/OwnedFD.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tokio/src/io/bsd/poll_aio.rs:38: This comment describes the deprecated register as “unsafe”, but the method isn’t actually unsafe (it’s just not I/O-safe), which could mislead implementors reading the compatibility note.

Severity: low

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:incorrect-but-reasonable; category:bug; feedback: The Augment AI reviewer is not correct! Here unsafe is confused with Rust's unsafe keyword while it actually means that the usage of RawFD is more unsafe than using BorrowedFS/OwnedFD.

#[allow(deprecated)]
self.register(kq.as_raw_fd(), token)
}

/// Deregisters this AIO event source with Tokio's reactor.
fn deregister(&mut self);
Expand All @@ -37,7 +56,8 @@ impl<T: AioSource> Source for MioSource<T> {
interests: mio::Interest,
) -> io::Result<()> {
assert!(interests.is_aio() || interests.is_lio());
self.0.register(registry.as_raw_fd(), usize::from(token));
self.0
.register_borrowed(registry.as_fd(), usize::from(token));
Ok(())
}

Expand All @@ -53,7 +73,8 @@ impl<T: AioSource> Source for MioSource<T> {
interests: mio::Interest,
) -> io::Result<()> {
assert!(interests.is_aio() || interests.is_lio());
self.0.register(registry.as_raw_fd(), usize::from(token));
self.0
.register_borrowed(registry.as_fd(), usize::from(token));
Ok(())
}
}
Expand Down
13 changes: 7 additions & 6 deletions tokio/tests/io_async_fd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::time::Duration;
use std::{
future::Future,
io::{self, ErrorKind, Read, Write},
os::fd::OwnedFd,
task::{Context, Waker},
};

Expand Down Expand Up @@ -69,7 +70,7 @@ impl AsRawFd for FileDescriptor {

impl Read for &FileDescriptor {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
read(self.fd.as_raw_fd(), buf).map_err(io::Error::from)
read(&self.fd, buf).map_err(io::Error::from)
}
}

Expand Down Expand Up @@ -99,10 +100,10 @@ impl Write for FileDescriptor {
}
}

fn set_nonblocking(fd: RawFd) {
fn set_nonblocking(fd: &OwnedFd) {
use nix::fcntl::{OFlag, F_GETFL, F_SETFL};

let flags = nix::fcntl::fcntl(fd, F_GETFL).expect("fcntl(F_GETFD)");
let flags = nix::fcntl::fcntl(fd, F_GETFL).expect("fcntl(F_GETFL)");

if flags < 0 {
panic!(
Expand All @@ -114,7 +115,7 @@ fn set_nonblocking(fd: RawFd) {

let flags = OFlag::from_bits_truncate(flags) | OFlag::O_NONBLOCK;

nix::fcntl::fcntl(fd, F_SETFL(flags)).expect("fcntl(F_SETFD)");
nix::fcntl::fcntl(fd, F_SETFL(flags)).expect("fcntl(F_SETFL)");
}

fn socketpair() -> (FileDescriptor, FileDescriptor) {
Expand All @@ -129,8 +130,8 @@ fn socketpair() -> (FileDescriptor, FileDescriptor) {
.expect("socketpair");
let fds = (FileDescriptor { fd: fd_a }, FileDescriptor { fd: fd_b });

set_nonblocking(fds.0.fd.as_raw_fd());
set_nonblocking(fds.1.fd.as_raw_fd());
set_nonblocking(&fds.0.fd);
set_nonblocking(&fds.1.fd);

fds
}
Expand Down
9 changes: 5 additions & 4 deletions tokio/tests/io_async_fd_memory_leak.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ fn allocated_bytes() -> usize {
#[tokio::test]
async fn memory_leak_when_fd_closed_before_drop() {
use nix::sys::socket::{self, AddressFamily, SockFlag, SockType};
use std::os::fd::OwnedFd;
use std::os::unix::io::{AsRawFd, RawFd};
use std::sync::Arc;
use tokio::io::unix::AsyncFd;
Expand All @@ -83,7 +84,7 @@ async fn memory_leak_when_fd_closed_before_drop() {
}
}

fn set_nonblocking(fd: RawFd) {
fn set_nonblocking(fd: &OwnedFd) {
use nix::fcntl::{OFlag, F_GETFL, F_SETFL};

let flags = nix::fcntl::fcntl(fd, F_GETFL).expect("fcntl(F_GETFL)");
Expand Down Expand Up @@ -119,7 +120,7 @@ async fn memory_leak_when_fd_closed_before_drop() {
.unwrap();

let raw_fd = fd_a.as_raw_fd();
set_nonblocking(raw_fd);
set_nonblocking(&fd_a);
std::mem::forget(fd_a);

let wrapper = Arc::new(RawFdWrapper { fd: raw_fd });
Expand Down Expand Up @@ -148,7 +149,7 @@ async fn memory_leak_when_fd_closed_before_drop() {
.unwrap();

let raw_fd = fd_a.as_raw_fd();
set_nonblocking(raw_fd);
set_nonblocking(&fd_a);
std::mem::forget(fd_a);

let wrapper = Arc::new(RawFdWrapper { fd: raw_fd });
Expand All @@ -175,7 +176,7 @@ async fn memory_leak_when_fd_closed_before_drop() {
.unwrap();

let raw_fd = fd_a.as_raw_fd();
set_nonblocking(raw_fd);
set_nonblocking(&fd_a);
std::mem::forget(fd_a);

let wrapper = Arc::new(RawFdWrapper { fd: raw_fd });
Expand Down
14 changes: 7 additions & 7 deletions tokio/tests/io_poll_aio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use mio_aio::{AioFsyncMode, SourceApi};
use std::{
future::Future,
io, mem,
os::fd::AsFd,
os::unix::io::{AsRawFd, RawFd},
os::fd::{AsFd, BorrowedFd},
os::unix::io::AsRawFd,
pin::{pin, Pin},
task::{Context, Poll},
};
Expand All @@ -21,7 +21,7 @@ mod aio {
struct TokioSource<'fd>(mio_aio::Source<nix::sys::aio::AioFsync<'fd>>);

impl<'fd> AioSource for TokioSource<'fd> {
fn register(&mut self, kq: RawFd, token: usize) {
fn register_borrowed(&mut self, kq: BorrowedFd<'_>, token: usize) {
self.0.register_raw(kq, token)
}
fn deregister(&mut self) {
Expand Down Expand Up @@ -81,10 +81,10 @@ mod aio {
}

impl AioSource for LlSource {
fn register(&mut self, kq: RawFd, token: usize) {
fn register_borrowed(&mut self, kq: BorrowedFd<'_>, token: usize) {
let mut sev: libc::sigevent = unsafe { mem::MaybeUninit::zeroed().assume_init() };
sev.sigev_notify = libc::SIGEV_KEVENT;
sev.sigev_signo = kq;
sev.sigev_signo = kq.as_raw_fd();
sev.sigev_value = libc::sigval {
sival_ptr: token as *mut libc::c_void,
};
Expand Down Expand Up @@ -222,10 +222,10 @@ mod lio {
}

impl<'a> AioSource for LioSource<'a> {
fn register(&mut self, kq: RawFd, token: usize) {
fn register_borrowed(&mut self, kq: BorrowedFd<'_>, token: usize) {
let mut sev: libc::sigevent = unsafe { mem::MaybeUninit::zeroed().assume_init() };
sev.sigev_notify = libc::SIGEV_KEVENT;
sev.sigev_signo = kq;
sev.sigev_signo = kq.as_raw_fd();
sev.sigev_value = libc::sigval {
sival_ptr: token as *mut libc::c_void,
};
Expand Down
6 changes: 3 additions & 3 deletions tokio/tests/net_unix_pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use tokio_test::{assert_err, assert_ok, assert_pending, assert_ready_ok};

use std::fs::File;
use std::io;
use std::os::fd::AsFd;
use std::os::unix::fs::OpenOptionsExt;
use std::os::unix::io::AsRawFd;
use std::path::{Path, PathBuf};

/// Helper struct which will clean up temporary files once dropped.
Expand Down Expand Up @@ -277,8 +277,8 @@ async fn from_file_detects_wrong_access_mode() -> io::Result<()> {
Ok(())
}

fn is_nonblocking<T: AsRawFd>(fd: &T) -> io::Result<bool> {
let flags = nix::fcntl::fcntl(fd.as_raw_fd(), nix::fcntl::F_GETFL)?;
fn is_nonblocking<T: AsFd>(fd: &T) -> io::Result<bool> {
let flags = nix::fcntl::fcntl(fd.as_fd(), nix::fcntl::F_GETFL)?;
Ok((flags & libc::O_NONBLOCK) != 0)
}

Expand Down
Loading