Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions mea/src/internal/waitset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,29 @@
use std::task::Context;
use std::task::Waker;

use slab::Slab;

#[derive(Debug)]
pub(crate) struct WaitSet {
waiters: Slab<Waker>,
waiters: Vec<Waker>,
}

impl WaitSet {
/// Construct a new, empty wait set.
pub const fn new() -> Self {
Self {
waiters: Slab::new(),
waiters: Vec::new(),
}
}

/// Construct a new, empty wait set with the specified capacity.
pub fn with_capacity(capacity: usize) -> Self {
Self {
waiters: Slab::with_capacity(capacity),
waiters: Vec::with_capacity(capacity),
}
}

/// Drain and wake up all waiters.
pub(crate) fn wake_all(&mut self) {
for w in self.waiters.drain() {
for w in self.waiters.drain(..) {
Copy link
Collaborator

@tisonkun tisonkun Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: when take the whole range of a vec, the following code should be more efficient

Suggested change
for w in self.waiters.drain(..) {
for w in std::mem::take(&mut self.waiters) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wake_all could be called many times, std::mem::take just replaces the Vec with an empty one so that the capacity is lost.

BTW, I noticed the capacity is only set in Barrier::new. Is this necessary?

w.wake();
}
}
Expand All @@ -51,11 +49,11 @@ impl WaitSet {
pub(crate) fn register_waker(&mut self, idx: &mut Option<usize>, cx: &mut Context<'_>) {
match *idx {
None => {
let key = self.waiters.insert(cx.waker().clone());
*idx = Some(key);
self.waiters.push(cx.waker().clone());
*idx = Some(self.waiters.len() - 1);
}
Some(key) => {
if self.waiters.contains(key) {
if key < self.waiters.len() {
if !self.waiters[key].will_wake(cx.waker()) {
self.waiters[key] = cx.waker().clone();
}
Comment on lines +56 to 59
Copy link

Copilot AI Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This index validation check is incorrect and can lead to waker corruption. After wake_all() drains the Vec, the indices from previous registrations can collide with newly registered wakers.

Scenario demonstrating the bug:

  1. Task A registers and gets idx=0, Task B registers and gets idx=1
  2. wake_all() is called, draining the Vec (now empty)
  3. Task C registers for the first time and gets idx=0
  4. Task A is re-polled with its old idx=0
  5. The check 0 < self.waiters.len() (which is 1) passes
  6. Task A's waker overwrites Task C's waker at index 0
  7. Task C will never be woken up

With Slab, the contains(key) method would correctly return false for indices from drained entries, preventing this collision. The simple length check cannot distinguish between "new index 0 after drain" and "old index 0 from before drain".

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slab has the same issue @copilot

    let mut slab = Slab::<i32>::with_capacity(3);
    let idx0 = slab.insert(1);
    println!("Inserted 1 at index {}", idx0);
    let idx1 = slab.insert(2);
    println!("Inserted 2 at index {}", idx1);
    slab.drain().for_each(|v| {
        println!("Drained value: {}", v);
    });
    let idx2 = slab.insert(3);
    println!("Inserted 3 at index {}", idx2);
    let cached = slab.contains(idx0);
    println!("Slab contains index {}: {}", idx0, cached);

Outputs:

running 1 test
Inserted 1 at index 0
Inserted 2 at index 1
Drained value: 1
Drained value: 2
Inserted 3 at index 0
Slab contains index 0: true

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah .. The index is actually a pointer without safety check. It would be better if we can build some dependent type check or more obvious invariants.

Expand All @@ -71,8 +69,8 @@ impl WaitSet {
//
// Barrier holds the lock during check and register, so the race condition
// above won't happen.
let key = self.waiters.insert(cx.waker().clone());
*idx = Some(key);
self.waiters.push(cx.waker().clone());
*idx = Some(self.waiters.len() - 1);
}
}
}
Expand Down