From c3d6c16d31d90e046eb752d8086077bb1b35e3f5 Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 21 Jan 2026 13:01:27 +0800 Subject: [PATCH] chore: catch up tidy and tests Signed-off-by: tison --- CHANGELOG.md | 2 +- mea/src/lib.rs | 6 ++ mea/src/once/once_map/mod.rs | 2 +- mea/src/once/once_map/tests.rs | 115 +++++++++++++++++++++++++++++++++ mea/src/singleflight/tests.rs | 90 ++++++++++++++++++++++++++ 5 files changed, 213 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 146abe3..2ba6f00 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,7 +8,7 @@ All notable changes to this project will be documented in this file. * Implement `once::OnceMap` to run computation only once and store the results in a hash map. * `singleflight::Group` now supports custom hashers for keys. -* `singleflight::Group::remove` now accepts any `&Q` where `Q: ?Sized + Hash + Eq` and `K: Borrow` aligning with standard HashMap's interface. +* `singleflight::Group::forget` now accepts any `&Q` where `Q: ?Sized + Hash + Eq` and `K: Borrow` aligning with standard HashMap's interface. ## [0.6.1] - 2026-01-11 diff --git a/mea/src/lib.rs b/mea/src/lib.rs index 111868b..f981bfd 100644 --- a/mea/src/lib.rs +++ b/mea/src/lib.rs @@ -106,6 +106,7 @@ mod tests { use crate::mutex::MutexGuard; use crate::once::Once; use crate::once::OnceCell; + use crate::once::OnceMap; use crate::oneshot; use crate::rwlock::RwLock; use crate::rwlock::RwLockReadGuard; @@ -113,6 +114,7 @@ mod tests { use crate::semaphore::Semaphore; use crate::shutdown::ShutdownRecv; use crate::shutdown::ShutdownSend; + use crate::singleflight; use crate::waitgroup::Wait; use crate::waitgroup::WaitGroup; @@ -123,6 +125,8 @@ mod tests { do_assert_send_and_sync::(); do_assert_send_and_sync::(); do_assert_send_and_sync::>(); + do_assert_send_and_sync::>(); + do_assert_send_and_sync::>(); do_assert_send_and_sync::(); do_assert_send_and_sync::(); do_assert_send_and_sync::(); @@ -161,6 +165,8 @@ mod tests { do_assert_unpin::(); do_assert_unpin::(); do_assert_unpin::>(); + do_assert_unpin::>(); + do_assert_unpin::>(); do_assert_unpin::(); do_assert_unpin::(); do_assert_unpin::(); diff --git a/mea/src/once/once_map/mod.rs b/mea/src/once/once_map/mod.rs index 1515bb1..b4087f1 100644 --- a/mea/src/once/once_map/mod.rs +++ b/mea/src/once/once_map/mod.rs @@ -145,7 +145,7 @@ where /// Remove the given key from the map. /// - /// If you need to get the value that has been remove, use the [`remove`] method instead. + /// If you need to get the value that has been removed, use the [`remove`] method instead. /// /// [`remove`]: Self::remove pub fn discard(&self, key: &Q) diff --git a/mea/src/once/once_map/tests.rs b/mea/src/once/once_map/tests.rs index a6749aa..b329cb2 100644 --- a/mea/src/once/once_map/tests.rs +++ b/mea/src/once/once_map/tests.rs @@ -12,13 +12,28 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::hash_map::RandomState; use std::sync::Arc; +use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::time::Duration; use crate::once::OnceMap; +#[test] +fn test_default_and_constructors() { + let _map: OnceMap = OnceMap::default(); + let _: OnceMap = OnceMap::new(); + let _: OnceMap = OnceMap::with_capacity(10); + let _: OnceMap = OnceMap::with_hasher(RandomState::new()); + let _: OnceMap = OnceMap::with_capacity_and_hasher(10, RandomState::new()); + + // Check capacity (indirectly via debug or just ensure it runs) + let map: OnceMap = OnceMap::with_capacity(100); + assert!(format!("{:?}", map).contains("OnceMap")); +} + #[tokio::test] async fn test_compute() { let map = OnceMap::new(); @@ -70,6 +85,45 @@ async fn test_try_compute() { assert_eq!(res, Ok(1)); } +#[tokio::test] +async fn test_try_compute_concurrent_failure_then_success() { + let map = Arc::new(OnceMap::new()); + let success = Arc::new(AtomicBool::new(false)); + let map_clone = map.clone(); + let success_clone = success.clone(); + + // Spawn a task that fails + let t1 = tokio::spawn(async move { + map_clone + .try_compute("key", async move || { + tokio::time::sleep(Duration::from_millis(50)).await; + Err::("fail") + }) + .await + }); + + // Spawn a task that succeeds, but starts slightly later/runs concurrent + let map_clone2 = map.clone(); + let t2 = tokio::spawn(async move { + // Wait for t1 to start + tokio::time::sleep(Duration::from_millis(10)).await; + // This should block until t1 fails, then retry (conceptually) + map_clone2 + .try_compute("key", async move || { + success_clone.store(true, Ordering::SeqCst); + Ok::(1) + }) + .await + }); + + let res1 = t1.await.unwrap(); + assert_eq!(res1, Err("fail")); + + let res2 = t2.await.unwrap(); + assert_eq!(res2, Ok(1)); + assert!(success.load(Ordering::SeqCst)); +} + #[tokio::test] async fn test_get_remove() { let map = OnceMap::new(); @@ -88,6 +142,55 @@ async fn test_get_remove() { assert_eq!(map.get("key"), None); } +#[tokio::test] +async fn test_remove_while_computing() { + let map = Arc::new(OnceMap::new()); + let map_clone = map.clone(); + + let t1 = tokio::spawn(async move { + map_clone + .compute("key", async || { + tokio::time::sleep(Duration::from_millis(100)).await; + 1 + }) + .await + }); + + // Give t1 time to insert the cell and start "computing" + tokio::time::sleep(Duration::from_millis(20)).await; + + // Remove should return None because value is not ready + // And it removes the cell from the map. + assert_eq!(map.remove("key"), None); + + // t1 finishes. It returns 1. + assert_eq!(t1.await.unwrap(), 1); + + // The map should be empty now (key was removed) + assert_eq!(map.get("key"), None); +} + +#[tokio::test] +async fn test_get_while_computing() { + let map = Arc::new(OnceMap::new()); + let map_clone = map.clone(); + + let t1 = tokio::spawn(async move { + map_clone + .compute("key", async || { + tokio::time::sleep(Duration::from_millis(50)).await; + 1 + }) + .await + }); + + tokio::time::sleep(Duration::from_millis(10)).await; + assert_eq!(map.get("key"), None); + + assert_eq!(t1.await.unwrap(), 1); + assert_eq!(map.get("key"), Some(1)); +} + #[tokio::test] async fn test_from_iter() { let map: OnceMap<_, _> = vec![("a", 1), ("b", 2)].into_iter().collect(); @@ -95,3 +198,15 @@ async fn test_from_iter() { assert_eq!(map.get("b"), Some(2)); assert_eq!(map.get("c"), None); } + +#[tokio::test] +async fn test_complex_key_value() { + #[derive(Hash, PartialEq, Eq, Clone, Debug)] + struct Key(i32); + + let map = OnceMap::new(); + let v = map.compute(Key(1), async || "value".to_string()).await; + assert_eq!(v, "value"); + + assert_eq!(map.get(&Key(1)), Some("value".to_string())); +} diff --git a/mea/src/singleflight/tests.rs b/mea/src/singleflight/tests.rs index 19c41a9..1260231 100644 --- a/mea/src/singleflight/tests.rs +++ b/mea/src/singleflight/tests.rs @@ -142,3 +142,93 @@ async fn test_panic_safe() { let res = group.work("key", || async { "success".to_string() }).await; assert_eq!(res, "success"); } + +#[tokio::test] +async fn test_try_work_simple() { + let group = Group::new(); + let res = group + .try_work("key", || async { Ok::<&str, ()>("val") }) + .await; + assert_eq!(res, Ok("val")); + + // Should be removed from map, so next call executes again + let res2 = group + .try_work("key", || async { Ok::<&str, ()>("val2") }) + .await; + assert_eq!(res2, Ok("val2")); +} + +#[tokio::test] +async fn test_try_work_coalescing() { + let group = Arc::new(Group::new()); + let counter = Arc::new(AtomicUsize::new(0)); + + let mut handles = Vec::new(); + for _ in 0..10 { + let group = group.clone(); + let counter = counter.clone(); + handles.push(tokio::spawn(async move { + group + .try_work("key", || async move { + tokio::time::sleep(Duration::from_millis(100)).await; + counter.fetch_add(1, Ordering::SeqCst); + Ok::<&str, ()>("val") + }) + .await + })); + } + + for handle in handles { + assert_eq!(handle.await.unwrap(), Ok("val")); + } + + assert_eq!(counter.load(Ordering::SeqCst), 1); +} + +#[tokio::test] +async fn test_try_work_failure() { + let group = Group::new(); + let res = group + .try_work("key", || async { Err::<&str, &str>("error") }) + .await; + assert_eq!(res, Err("error")); + + // Retry should work + let res2 = group + .try_work("key", || async { Ok::<&str, ()>("success") }) + .await; + assert_eq!(res2, Ok("success")); +} + +#[tokio::test] +async fn test_try_work_wait_and_retry() { + let group = Arc::new(Group::new()); + let counter = Arc::new(AtomicUsize::new(0)); + + let g1 = group.clone(); + let c1 = counter.clone(); + let h1 = tokio::spawn(async move { + g1.try_work("key", || async move { + c1.fetch_add(1, Ordering::SeqCst); + tokio::time::sleep(Duration::from_millis(100)).await; + Err::<&str, &str>("fail") + }) + .await + }); + + let g2 = group.clone(); + let c2 = counter.clone(); + let h2 = tokio::spawn(async move { + // Ensure h1 starts first + tokio::time::sleep(Duration::from_millis(10)).await; + g2.try_work("key", || async move { + c2.fetch_add(1, Ordering::SeqCst); + Ok::<&str, ()>("success") + }) + .await + }); + + assert_eq!(h1.await.unwrap(), Err("fail")); + assert_eq!(h2.await.unwrap(), Ok("success")); + assert_eq!(counter.load(Ordering::SeqCst), 2); +}