From a825329d22fd2935a217dd774da3abad7008e6cf Mon Sep 17 00:00:00 2001 From: Caleb Metz Date: Fri, 30 Jan 2026 13:41:42 -0500 Subject: [PATCH 1/2] adds loom tests and yield point in signal_and_wait Signed-off-by: Caleb Metz --- lading_signal/src/lib.rs | 56 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/lading_signal/src/lib.rs b/lading_signal/src/lib.rs index 7b0fb6e3d..83c1b0821 100644 --- a/lading_signal/src/lib.rs +++ b/lading_signal/src/lib.rs @@ -31,6 +31,19 @@ use tokio::sync::{ }; use tracing::info; +// Loom instrumentation: When enabled, `signal_and_wait` will yield in the race +// window between `peers.load()` and `notified().await`. This allows loom to +// explore the interleaving where a watcher's `notify_waiters()` fires in this +// window, exposing the lost wakeup race condition. +// +// This is only used in tests to prove the race exists. In production builds +// (`#[cfg(not(loom))]`), this is compiled out entirely. +#[cfg(loom)] +std::thread_local! { + #[allow(missing_docs)] + pub static LOOM_EXPLORE_RACE_WINDOW: std::cell::Cell = const { std::cell::Cell::new(false) }; +} + /// Construct a `Watcher` and `Broadcaster` pair. #[must_use] pub fn signal() -> (Watcher, Broadcaster) { @@ -95,6 +108,12 @@ impl Broadcaster { // has signaled that it has received the transmitted signal. loop { let peers = self.peers.load(Ordering::SeqCst); + + #[cfg(loom)] + if LOOM_EXPLORE_RACE_WINDOW.get() { + loom::thread::yield_now(); // Force exploration here + } + if peers == 0 { break; } @@ -575,4 +594,41 @@ mod tests { watcher_handle.join().unwrap(); }); } + + #[cfg(loom)] + #[test] + fn signal_and_wait_race_proof_with_yeild() { + use crate::LOOM_EXPLORE_RACE_WINDOW; + use crate::signal; + use loom::{future::block_on, thread}; + + loom::model(|| { + LOOM_EXPLORE_RACE_WINDOW.set(true); // Enable instrumentation + + let (watcher, broadcaster) = signal(); + + let handle = thread::spawn(move || drop(watcher)); + + block_on(broadcaster.signal_and_wait()); // Calls REAL function + + handle.join().unwrap(); + }); + } + + #[cfg(loom)] + #[test] + fn signal_and_wait_race_proof_without_yeild() { + use crate::signal; + use loom::{future::block_on, thread}; + + loom::model(|| { + let (watcher, broadcaster) = signal(); + + let handle = thread::spawn(move || drop(watcher)); + + block_on(broadcaster.signal_and_wait()); // Calls REAL function + + handle.join().unwrap(); + }); + } } From 577f5bac6faede96206ac4a15daf7bcbed135ed3 Mon Sep 17 00:00:00 2001 From: Caleb Metz Date: Fri, 30 Jan 2026 13:47:54 -0500 Subject: [PATCH 2/2] fix race Signed-off-by: Caleb Metz --- lading_signal/src/lib.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/lading_signal/src/lib.rs b/lading_signal/src/lib.rs index 83c1b0821..d8de00495 100644 --- a/lading_signal/src/lib.rs +++ b/lading_signal/src/lib.rs @@ -107,6 +107,10 @@ impl Broadcaster { // `decrease_peer_count`: loop will not consume CPU until a `Watcher` // has signaled that it has received the transmitted signal. loop { + // Register interest first + let notified = self.notify.notified(); + + // Check condition let peers = self.peers.load(Ordering::SeqCst); #[cfg(loom)] @@ -118,7 +122,9 @@ impl Broadcaster { break; } info!("Waiting for {peers} peers"); - self.notify.notified().await; + + // Safe to await, we are registered + notified.await; } } }