Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 57 additions & 105 deletions tests/pass-dep/libc/libc-epoll-blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use std::thread;

#[path = "../../utils/libc.rs"]
mod libc_utils;
use libc_utils::epoll::*;
use libc_utils::*;

// This is a set of testcases for blocking epoll.

Expand All @@ -20,47 +22,22 @@ fn main() {
}

// Using `as` cast since `EPOLLET` wraps around
const EPOLL_IN_OUT_ET: u32 = (libc::EPOLLIN | libc::EPOLLOUT | libc::EPOLLET) as _;

#[track_caller]
fn check_epoll_wait<const N: usize>(
epfd: i32,
expected_notifications: &[(u32, u64)],
timeout: i32,
) {
let epoll_event = libc::epoll_event { events: 0, u64: 0 };
let mut array: [libc::epoll_event; N] = [epoll_event; N];
let maxsize = N;
let array_ptr = array.as_mut_ptr();
let res = unsafe { libc::epoll_wait(epfd, array_ptr, maxsize.try_into().unwrap(), timeout) };
if res < 0 {
panic!("epoll_wait failed: {}", std::io::Error::last_os_error());
}
let got_notifications =
unsafe { std::slice::from_raw_parts(array_ptr, res.try_into().unwrap()) };
let got_notifications = got_notifications.iter().map(|e| (e.events, e.u64)).collect::<Vec<_>>();
assert_eq!(got_notifications, expected_notifications, "got wrong notifications");
}
const EPOLL_IN_OUT_ET: i32 = (libc::EPOLLIN | libc::EPOLLOUT | libc::EPOLLET) as _;

// This test allows epoll_wait to block, then unblock without notification.
fn test_epoll_block_without_notification() {
// Create an epoll instance.
let epfd = unsafe { libc::epoll_create1(0) };
assert_ne!(epfd, -1);
let epfd = errno_result(unsafe { libc::epoll_create1(0) }).unwrap();

// Create an eventfd instances.
let flags = libc::EFD_NONBLOCK | libc::EFD_CLOEXEC;
let fd = unsafe { libc::eventfd(0, flags) };
let fd = errno_result(unsafe { libc::eventfd(0, flags) }).unwrap();

// Register eventfd with epoll.
let mut ev = libc::epoll_event { events: EPOLL_IN_OUT_ET, u64: fd as u64 };
let res = unsafe { libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, fd, &mut ev) };
assert_eq!(res, 0);
epoll_ctl_add(epfd, fd, EPOLL_IN_OUT_ET).unwrap();

// epoll_wait to clear notification.
let expected_event = u32::try_from(libc::EPOLLOUT).unwrap();
let expected_value = fd as u64;
check_epoll_wait::<1>(epfd, &[(expected_event, expected_value)], 0);
check_epoll_wait::<1>(epfd, &[Ev { events: libc::EPOLLOUT as _, data: fd }], 0);

// This epoll wait blocks, and timeout without notification.
check_epoll_wait::<1>(epfd, &[], 5);
Expand All @@ -69,102 +46,86 @@ fn test_epoll_block_without_notification() {
// This test triggers notification and unblocks the epoll_wait before timeout.
fn test_epoll_block_then_unblock() {
// Create an epoll instance.
let epfd = unsafe { libc::epoll_create1(0) };
assert_ne!(epfd, -1);
let epfd = errno_result(unsafe { libc::epoll_create1(0) }).unwrap();

// Create a socketpair instance.
let mut fds = [-1, -1];
let res = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) };
assert_eq!(res, 0);
errno_check(unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) });

// Register one side of the socketpair with epoll.
let mut ev = libc::epoll_event { events: EPOLL_IN_OUT_ET, u64: fds[0] as u64 };
let res = unsafe { libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, fds[0], &mut ev) };
assert_eq!(res, 0);
epoll_ctl_add(epfd, fds[0], EPOLL_IN_OUT_ET).unwrap();

// epoll_wait to clear notification.
let expected_event = u32::try_from(libc::EPOLLOUT).unwrap();
let expected_value = fds[0] as u64;
check_epoll_wait::<1>(epfd, &[(expected_event, expected_value)], 0);
check_epoll_wait::<1>(epfd, &[Ev { events: libc::EPOLLOUT as _, data: fds[0] }], 0);

// epoll_wait before triggering notification so it will block then get unblocked before timeout.
let expected_event = u32::try_from(libc::EPOLLIN | libc::EPOLLOUT).unwrap();
let expected_value = fds[0] as u64;
let thread1 = thread::spawn(move || {
thread::yield_now();
let data = "abcde".as_bytes().as_ptr();
let res = unsafe { libc_utils::write_all(fds[1], data as *const libc::c_void, 5) };
assert_eq!(res, 5);
write_all_from_slice(fds[1], b"abcde").unwrap();
});
check_epoll_wait::<1>(epfd, &[(expected_event, expected_value)], 10);
check_epoll_wait::<1>(
epfd,
&[Ev { events: (libc::EPOLLIN | libc::EPOLLOUT) as _, data: fds[0] }],
10,
);
thread1.join().unwrap();
}

// This test triggers a notification after epoll_wait times out.
fn test_notification_after_timeout() {
// Create an epoll instance.
let epfd = unsafe { libc::epoll_create1(0) };
assert_ne!(epfd, -1);
let epfd = errno_result(unsafe { libc::epoll_create1(0) }).unwrap();

// Create a socketpair instance.
let mut fds = [-1, -1];
let res = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) };
assert_eq!(res, 0);
errno_check(unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) });

// Register one side of the socketpair with epoll.
let mut ev = libc::epoll_event { events: EPOLL_IN_OUT_ET, u64: fds[0] as u64 };
let res = unsafe { libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, fds[0], &mut ev) };
assert_eq!(res, 0);
epoll_ctl_add(epfd, fds[0], EPOLL_IN_OUT_ET).unwrap();

// epoll_wait to clear notification.
let expected_event = u32::try_from(libc::EPOLLOUT).unwrap();
let expected_value = fds[0] as u64;
check_epoll_wait::<1>(epfd, &[(expected_event, expected_value)], 0);
check_epoll_wait::<1>(epfd, &[Ev { events: libc::EPOLLOUT as _, data: fds[0] }], 0);

// epoll_wait timeouts without notification.
check_epoll_wait::<1>(epfd, &[], 10);

// Trigger epoll notification after timeout.
let data = "abcde".as_bytes().as_ptr();
let res = unsafe { libc_utils::write_all(fds[1], data as *const libc::c_void, 5) };
assert_eq!(res, 5);
write_all_from_slice(fds[1], b"abcde").unwrap();

// Check the result of the notification.
let expected_event = u32::try_from(libc::EPOLLIN | libc::EPOLLOUT).unwrap();
let expected_value = fds[0] as u64;
check_epoll_wait::<1>(epfd, &[(expected_event, expected_value)], 10);
check_epoll_wait::<1>(
epfd,
&[Ev { events: (libc::EPOLLIN | libc::EPOLLOUT) as _, data: fds[0] }],
10,
);
}

// This test shows a data_race before epoll had vector clocks added.
fn test_epoll_race() {
// Create an epoll instance.
let epfd = unsafe { libc::epoll_create1(0) };
assert_ne!(epfd, -1);
let epfd = errno_result(unsafe { libc::epoll_create1(0) }).unwrap();

// Create an eventfd instance.
let flags = libc::EFD_NONBLOCK | libc::EFD_CLOEXEC;
let fd = unsafe { libc::eventfd(0, flags) };
let fd = errno_result(unsafe { libc::eventfd(0, flags) }).unwrap();

// Register eventfd with the epoll instance.
let mut ev = libc::epoll_event { events: EPOLL_IN_OUT_ET, u64: fd as u64 };
let res = unsafe { libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, fd, &mut ev) };
assert_eq!(res, 0);
epoll_ctl_add(epfd, fd, EPOLL_IN_OUT_ET).unwrap();

static mut VAL: u8 = 0;
let thread1 = thread::spawn(move || {
// Write to the static mut variable.
unsafe { VAL = 1 };
// Write to the eventfd instance.
let sized_8_data: [u8; 8] = 1_u64.to_ne_bytes();
let res = unsafe { libc::write(fd, sized_8_data.as_ptr() as *const libc::c_void, 8) };
// write returns number of bytes written, which is always 8.
assert_eq!(res, 8);
write_all_from_slice(fd, &1_u64.to_ne_bytes()).unwrap();
});
thread::yield_now();
// epoll_wait for the event to happen.
let expected_event = u32::try_from(libc::EPOLLIN | libc::EPOLLOUT).unwrap();
let expected_value = u64::try_from(fd).unwrap();
check_epoll_wait::<8>(epfd, &[(expected_event, expected_value)], -1);
check_epoll_wait::<8>(
epfd,
&[Ev { events: (libc::EPOLLIN | libc::EPOLLOUT) as _, data: fd }],
-1,
);
// Read from the static mut variable.
#[allow(static_mut_refs)]
unsafe {
Expand All @@ -177,35 +138,28 @@ fn test_epoll_race() {
/// epoll it is blocked on.
fn wakeup_on_new_interest() {
// Create an epoll instance.
let epfd = unsafe { libc::epoll_create1(0) };
assert_ne!(epfd, -1);
let epfd = errno_result(unsafe { libc::epoll_create1(0) }).unwrap();

// Create a socketpair instance.
let mut fds = [-1, -1];
let res = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) };
assert_eq!(res, 0);
errno_check(unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) });

// Write to fd[0]
let data = "abcde".as_bytes().as_ptr();
let res = unsafe { libc_utils::write_all(fds[0], data as *const libc::c_void, 5) };
assert_eq!(res, 5);
write_all_from_slice(fds[0], b"abcde").unwrap();

// Block a thread on the epoll instance.
let t = std::thread::spawn(move || {
let expected_event = u32::try_from(libc::EPOLLIN | libc::EPOLLOUT).unwrap();
let expected_value = u64::try_from(fds[1]).unwrap();
check_epoll_wait::<8>(epfd, &[(expected_event, expected_value)], -1);
check_epoll_wait::<8>(
epfd,
&[Ev { events: (libc::EPOLLIN | libc::EPOLLOUT) as _, data: fds[1] }],
-1,
);
});
// Ensure the thread is blocked.
std::thread::yield_now();

// Register fd[1] with EPOLLIN|EPOLLOUT|EPOLLET|EPOLLRDHUP
let mut ev = libc::epoll_event {
events: (libc::EPOLLIN | libc::EPOLLOUT | libc::EPOLLET | libc::EPOLLRDHUP) as _,
u64: u64::try_from(fds[1]).unwrap(),
};
let res = unsafe { libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, fds[1], &mut ev) };
assert_eq!(res, 0);
epoll_ctl_add(epfd, fds[1], EPOLL_IN_OUT_ET | libc::EPOLLRDHUP as i32).unwrap();

// This should wake up the thread.
t.join().unwrap();
Expand All @@ -215,45 +169,43 @@ fn wakeup_on_new_interest() {
/// to consume them all.
fn multiple_events_wake_multiple_threads() {
// Create an epoll instance.
let epfd = unsafe { libc::epoll_create1(0) };
assert_ne!(epfd, -1);
let epfd = errno_result(unsafe { libc::epoll_create1(0) }).unwrap();

// Create an eventfd instance.
let flags = libc::EFD_NONBLOCK | libc::EFD_CLOEXEC;
let fd1 = unsafe { libc::eventfd(0, flags) };
let fd1 = errno_result(unsafe { libc::eventfd(0, flags) }).unwrap();
// Make a duplicate so that we have two file descriptors for the same file description.
let fd2 = unsafe { libc::dup(fd1) };
let fd2 = errno_result(unsafe { libc::dup(fd1) }).unwrap();

// Register both with epoll.
let mut ev = libc::epoll_event { events: EPOLL_IN_OUT_ET, u64: fd1 as u64 };
let res = unsafe { libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, fd1, &mut ev) };
assert_eq!(res, 0);
let mut ev = libc::epoll_event { events: EPOLL_IN_OUT_ET, u64: fd2 as u64 };
let res = unsafe { libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, fd2, &mut ev) };
assert_eq!(res, 0);
epoll_ctl_add(epfd, fd1, EPOLL_IN_OUT_ET).unwrap();
epoll_ctl_add(epfd, fd2, EPOLL_IN_OUT_ET).unwrap();

// Consume the initial events.
let expected = [(libc::EPOLLOUT as u32, fd1 as u64), (libc::EPOLLOUT as u32, fd2 as u64)];
let expected = [
Ev { events: libc::EPOLLOUT as _, data: fd1 },
Ev { events: libc::EPOLLOUT as _, data: fd2 },
];
check_epoll_wait::<8>(epfd, &expected, -1);

// Block two threads on the epoll, both wanting to get just one event.
let t1 = thread::spawn(move || {
let mut e = libc::epoll_event { events: 0, u64: 0 };
let res = unsafe { libc::epoll_wait(epfd, &raw mut e, 1, -1) };
assert!(res == 1);
(e.events, e.u64)
Ev { events: e.events.cast_signed(), data: e.u64.try_into().unwrap() }
});
let t2 = thread::spawn(move || {
let mut e = libc::epoll_event { events: 0, u64: 0 };
let res = unsafe { libc::epoll_wait(epfd, &raw mut e, 1, -1) };
assert!(res == 1);
(e.events, e.u64)
Ev { events: e.events.cast_signed(), data: e.u64.try_into().unwrap() }
});
// Yield so both threads are waiting now.
thread::yield_now();

// Trigger the eventfd. This triggers two events at once!
libc_utils::write_all_from_slice(fd1, &0_u64.to_ne_bytes()).unwrap();
write_all_from_slice(fd1, &0_u64.to_ne_bytes()).unwrap();

// Both threads should have been woken up so that both events can be consumed.
let e1 = t1.join().unwrap();
Expand Down
11 changes: 8 additions & 3 deletions tests/utils/libc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ pub mod epoll {

/// The libc epoll_event type doesn't fit to the EPOLLIN etc constants, so we have our
/// own type. We also make the data field an int since we typically want to store FDs there.
#[derive(PartialEq, Debug)]
#[derive(PartialEq, Debug, Clone, Copy)]
pub struct Ev {
pub events: c_int,
pub data: c_int,
Expand All @@ -120,10 +120,10 @@ pub mod epoll {
}

#[track_caller]
pub fn check_epoll_wait_noblock<const N: usize>(epfd: i32, expected: &[Ev]) {
pub fn check_epoll_wait<const N: usize>(epfd: i32, expected: &[Ev], timeout: i32) {
let mut array: [libc::epoll_event; N] = [libc::epoll_event { events: 0, u64: 0 }; N];
let num = errno_result(unsafe {
libc::epoll_wait(epfd, array.as_mut_ptr(), N.try_into().unwrap(), 0)
libc::epoll_wait(epfd, array.as_mut_ptr(), N.try_into().unwrap(), timeout)
})
.expect("epoll_wait returned an error");
let got = &mut array[..num.try_into().unwrap()];
Expand All @@ -133,4 +133,9 @@ pub mod epoll {
.collect::<Vec<_>>();
assert_eq!(got, expected, "got wrong notifications");
}

#[track_caller]
pub fn check_epoll_wait_noblock<const N: usize>(epfd: i32, expected: &[Ev]) {
check_epoll_wait::<N>(epfd, expected, 0);
}
}