Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ All notable changes to this project will be documented in this file.

* Implement `once::OnceMap` to run computation only once and store the results in a hash map.
* `singleflight::Group` now supports custom hashers for keys.
* `singleflight::Group::remove` now accepts any `&Q` where `Q: ?Sized + Hash + Eq` and `K: Borrow<Q>` aligning with standard HashMap's interface.
* `singleflight::Group::forget` now accepts any `&Q` where `Q: ?Sized + Hash + Eq` and `K: Borrow<Q>` aligning with standard HashMap's interface.

## [0.6.1] - 2026-01-11

Expand Down
6 changes: 6 additions & 0 deletions mea/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,15 @@ mod tests {
use crate::mutex::MutexGuard;
use crate::once::Once;
use crate::once::OnceCell;
use crate::once::OnceMap;
use crate::oneshot;
use crate::rwlock::RwLock;
use crate::rwlock::RwLockReadGuard;
use crate::rwlock::RwLockWriteGuard;
use crate::semaphore::Semaphore;
use crate::shutdown::ShutdownRecv;
use crate::shutdown::ShutdownSend;
use crate::singleflight;
use crate::waitgroup::Wait;
use crate::waitgroup::WaitGroup;

Expand All @@ -123,6 +125,8 @@ mod tests {
do_assert_send_and_sync::<Condvar>();
do_assert_send_and_sync::<Once>();
do_assert_send_and_sync::<OnceCell<u32>>();
do_assert_send_and_sync::<OnceMap<String, u32>>();
do_assert_send_and_sync::<singleflight::Group<String, u32>>();
do_assert_send_and_sync::<Latch>();
do_assert_send_and_sync::<Semaphore>();
do_assert_send_and_sync::<ShutdownSend>();
Expand Down Expand Up @@ -161,6 +165,8 @@ mod tests {
do_assert_unpin::<Latch>();
do_assert_unpin::<Once>();
do_assert_unpin::<OnceCell<u32>>();
do_assert_unpin::<OnceMap<String, u32>>();
do_assert_unpin::<singleflight::Group<String, u32>>();
do_assert_unpin::<Semaphore>();
do_assert_unpin::<ShutdownSend>();
do_assert_unpin::<ShutdownRecv>();
Expand Down
2 changes: 1 addition & 1 deletion mea/src/once/once_map/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ where

/// Remove the given key from the map.
///
/// If you need to get the value that has been remove, use the [`remove`] method instead.
/// If you need to get the value that has been removed, use the [`remove`] method instead.
///
/// [`remove`]: Self::remove
pub fn discard<Q>(&self, key: &Q)
Expand Down
115 changes: 115 additions & 0 deletions mea/src/once/once_map/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,28 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::hash_map::RandomState;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::time::Duration;

use crate::once::OnceMap;

#[test]
fn test_default_and_constructors() {
let _map: OnceMap<String, i32> = OnceMap::default();
let _: OnceMap<String, i32> = OnceMap::new();
let _: OnceMap<String, i32> = OnceMap::with_capacity(10);
let _: OnceMap<String, i32> = OnceMap::with_hasher(RandomState::new());
let _: OnceMap<String, i32> = OnceMap::with_capacity_and_hasher(10, RandomState::new());

// Check capacity (indirectly via debug or just ensure it runs)
let map: OnceMap<String, i32> = OnceMap::with_capacity(100);
assert!(format!("{:?}", map).contains("OnceMap"));
}

#[tokio::test]
async fn test_compute() {
let map = OnceMap::new();
Expand Down Expand Up @@ -70,6 +85,45 @@ async fn test_try_compute() {
assert_eq!(res, Ok(1));
}

#[tokio::test]
async fn test_try_compute_concurrent_failure_then_success() {
let map = Arc::new(OnceMap::new());
let success = Arc::new(AtomicBool::new(false));
let map_clone = map.clone();
let success_clone = success.clone();

// Spawn a task that fails
let t1 = tokio::spawn(async move {
map_clone
.try_compute("key", async move || {
tokio::time::sleep(Duration::from_millis(50)).await;
Err::<i32, _>("fail")
})
.await
});

// Spawn a task that succeeds, but starts slightly later/runs concurrent
let map_clone2 = map.clone();
let t2 = tokio::spawn(async move {
// Wait for t1 to start
tokio::time::sleep(Duration::from_millis(10)).await;
// This should block until t1 fails, then retry (conceptually)
map_clone2
.try_compute("key", async move || {
success_clone.store(true, Ordering::SeqCst);
Ok::<i32, &str>(1)
})
.await
});

let res1 = t1.await.unwrap();
assert_eq!(res1, Err("fail"));

let res2 = t2.await.unwrap();
assert_eq!(res2, Ok(1));
assert!(success.load(Ordering::SeqCst));
}

#[tokio::test]
async fn test_get_remove() {
let map = OnceMap::new();
Expand All @@ -88,10 +142,71 @@ async fn test_get_remove() {
assert_eq!(map.get("key"), None);
}

#[tokio::test]
async fn test_remove_while_computing() {
let map = Arc::new(OnceMap::new());
let map_clone = map.clone();

let t1 = tokio::spawn(async move {
map_clone
.compute("key", async || {
tokio::time::sleep(Duration::from_millis(100)).await;
1
})
.await
});

// Give t1 time to insert the cell and start "computing"
tokio::time::sleep(Duration::from_millis(20)).await;

// Remove should return None because value is not ready
// And it removes the cell from the map.
assert_eq!(map.remove("key"), None);

// t1 finishes. It returns 1.
assert_eq!(t1.await.unwrap(), 1);

// The map should be empty now (key was removed)
assert_eq!(map.get("key"), None);
}

#[tokio::test]
async fn test_get_while_computing() {
let map = Arc::new(OnceMap::new());
let map_clone = map.clone();

let t1 = tokio::spawn(async move {
map_clone
.compute("key", async || {
tokio::time::sleep(Duration::from_millis(50)).await;
1
})
.await
});

tokio::time::sleep(Duration::from_millis(10)).await;
assert_eq!(map.get("key"), None);

assert_eq!(t1.await.unwrap(), 1);
assert_eq!(map.get("key"), Some(1));
}

#[tokio::test]
async fn test_from_iter() {
let map: OnceMap<_, _> = vec![("a", 1), ("b", 2)].into_iter().collect();
assert_eq!(map.get("a"), Some(1));
assert_eq!(map.get("b"), Some(2));
assert_eq!(map.get("c"), None);
}

#[tokio::test]
async fn test_complex_key_value() {
#[derive(Hash, PartialEq, Eq, Clone, Debug)]
struct Key(i32);

let map = OnceMap::new();
let v = map.compute(Key(1), async || "value".to_string()).await;
assert_eq!(v, "value");

assert_eq!(map.get(&Key(1)), Some("value".to_string()));
}
90 changes: 90 additions & 0 deletions mea/src/singleflight/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,93 @@ async fn test_panic_safe() {
let res = group.work("key", || async { "success".to_string() }).await;
assert_eq!(res, "success");
}

#[tokio::test]
async fn test_try_work_simple() {
let group = Group::new();
let res = group
.try_work("key", || async { Ok::<&str, ()>("val") })
.await;
assert_eq!(res, Ok("val"));

// Should be removed from map, so next call executes again
let res2 = group
.try_work("key", || async { Ok::<&str, ()>("val2") })
.await;
assert_eq!(res2, Ok("val2"));
}

#[tokio::test]
async fn test_try_work_coalescing() {
let group = Arc::new(Group::new());
let counter = Arc::new(AtomicUsize::new(0));

let mut handles = Vec::new();
for _ in 0..10 {
let group = group.clone();
let counter = counter.clone();
handles.push(tokio::spawn(async move {
group
.try_work("key", || async move {
tokio::time::sleep(Duration::from_millis(100)).await;
counter.fetch_add(1, Ordering::SeqCst);
Ok::<&str, ()>("val")
})
.await
}));
}

for handle in handles {
assert_eq!(handle.await.unwrap(), Ok("val"));
}

assert_eq!(counter.load(Ordering::SeqCst), 1);
}

#[tokio::test]
async fn test_try_work_failure() {
let group = Group::new();
let res = group
.try_work("key", || async { Err::<&str, &str>("error") })
.await;
assert_eq!(res, Err("error"));

// Retry should work
let res2 = group
.try_work("key", || async { Ok::<&str, ()>("success") })
.await;
assert_eq!(res2, Ok("success"));
}

#[tokio::test]
async fn test_try_work_wait_and_retry() {
let group = Arc::new(Group::new());
let counter = Arc::new(AtomicUsize::new(0));

let g1 = group.clone();
let c1 = counter.clone();
let h1 = tokio::spawn(async move {
g1.try_work("key", || async move {
c1.fetch_add(1, Ordering::SeqCst);
tokio::time::sleep(Duration::from_millis(100)).await;
Err::<&str, &str>("fail")
})
.await
});

let g2 = group.clone();
let c2 = counter.clone();
let h2 = tokio::spawn(async move {
// Ensure h1 starts first
tokio::time::sleep(Duration::from_millis(10)).await;
g2.try_work("key", || async move {
c2.fetch_add(1, Ordering::SeqCst);
Ok::<&str, ()>("success")
})
.await
});

assert_eq!(h1.await.unwrap(), Err("fail"));
assert_eq!(h2.await.unwrap(), Ok("success"));
assert_eq!(counter.load(Ordering::SeqCst), 2);
}