Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ All notable changes to this project will be documented in this file.

### New features

* Implement `once::OnceMap` to run computation only once and store the results in a hash map.
* `singleflight::Group` now supports custom hashers for keys.
* `singleflight::Group::remove` now accepts any `&Q` where `Q: ?Sized + Hash + Eq` and `K: Borrow<Q>` aligning with standard HashMap's interface.

## [0.6.1] - 2026-01-11

Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Mea (Make Easy Async) is a runtime-agnostic library providing essential synchron
* [**Mutex**](https://docs.rs/mea/*/mea/mutex/struct.Mutex.html): A mutual exclusion primitive for protecting shared data.
* [**Once**](https://docs.rs/mea/*/mea/once/struct.Once.html): A primitive that ensures a one-time asynchronous operation runs at most once, even when called concurrently.
* [**OnceCell**](https://docs.rs/mea/*/mea/once/struct.OnceCell.html): A cell that can be written to at most once, providing safe, lazy initialization.
* [**OnceMap**](https://docs.rs/mea/*/mea/once/struct.OnceMap.html): A hash map that runs computation only once for each key and stores the result.
* [**RwLock**](https://docs.rs/mea/*/mea/rwlock/struct.RwLock.html): A reader-writer lock that allows multiple readers or a single writer at a time.
* [**Semaphore**](https://docs.rs/mea/*/mea/semaphore/struct.Semaphore.html): A synchronization primitive that controls access to a shared resource.
* [**ShutdownSend & ShutdownRecv**](https://docs.rs/mea/*/mea/shutdown/): A composite synchronization primitive for managing shutdown signals.
Expand Down Expand Up @@ -74,6 +75,7 @@ This crate collects runtime-agnostic synchronization primitives from spare parts
* **Latch** is inspired by [`latches`](https://github.com/mirromutth/latches), with a different implementation based on the internal `CountdownState` primitive. No `wait` or `watch` method is provided, since it can be easily implemented by [composing delay futures](https://docs.rs/fastimer/*/fastimer/fn.timeout.html). No sync variant is provided, since it can be easily implemented with block_on of any runtime.
* **Mutex** is derived from `tokio::sync::Mutex`. No blocking method is provided, since it can be easily implemented with block_on of any runtime.
* **OnceCell** is derived from `tokio::sync::OnceCell`, but using our own semaphore implementation.
* **OnceMap** is inspired by `uv-once-map` but the interface and implementation are redesigned.
* **RwLock** is derived from `tokio::sync::RwLock`, but the `max_readers` can be any `NonZeroUsize` (effectively any positive `usize`) instead of `[0, u32::MAX >> 3]`. No blocking method is provided, since it can be easily implemented with block_on of any runtime.
* **Semaphore** is derived from `tokio::sync::Semaphore`, without `close` method since it is quite tricky to use. And thus, this semaphore doesn't have the limitation of max permits. Besides, new methods like `forget_exact` are added to fit the specific use case.
* **WaitGroup** is inspired by [`waitgroup-rs`](https://github.com/laizy/waitgroup-rs), providing different API flavor with a different implementation based on the internal `CountdownState` primitive.
Expand Down
2 changes: 2 additions & 0 deletions mea/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
//! * [`Once`]: A primitive that ensures a one-time asynchronous operation runs at most once, even
//! when called concurrently
//! * [`OnceCell`]: A cell that can be written to at most once and provides safe concurrent access
//! * [`OnceMap`]: A hash map that runs computation only once for each key and stores the result.
//! * [`RwLock`]: A reader-writer lock that allows multiple readers or a single writer at a time
//! * [`Semaphore`]: A synchronization primitive that controls access to a shared resource
//! * [`ShutdownSend`] & [`ShutdownRecv`]: A composite synchronization primitive for managing
Expand Down Expand Up @@ -61,6 +62,7 @@
//! [`Mutex`]: mutex::Mutex
//! [`Once`]: once::Once
//! [`OnceCell`]: once::OnceCell
//! [`OnceMap`]: once::OnceMap
//! [`RwLock`]: rwlock::RwLock
//! [`Semaphore`]: semaphore::Semaphore
//! [`ShutdownSend`]: shutdown::ShutdownSend
Expand Down
5 changes: 2 additions & 3 deletions mea/src/once/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@
#[allow(clippy::module_inception)]
mod once;
mod once_cell;
mod once_map;

pub use self::once::Once;
pub use self::once_cell::OnceCell;

#[cfg(test)]
mod tests;
pub use self::once_map::OnceMap;
3 changes: 3 additions & 0 deletions mea/src/once/once.rs → mea/src/once/once/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ use std::task::Poll;
use crate::internal::CountdownState;
use crate::semaphore::Semaphore;

#[cfg(test)]
mod tests;

/// A synchronization primitive which can be used to run a one-time async initialization.
///
/// Unlike [`std::sync::Once`], this type never blocks a thread. The provided closure must
Expand Down
179 changes: 179 additions & 0 deletions mea/src/once/once/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// Copyright 2024 tison <wander4096@gmail.com>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::time::Duration;

use tokio_test::assert_ready;

use super::*;
use crate::latch::Latch;
use crate::test_runtime;

#[tokio::test]
async fn test_call_once_runs_only_once() {
static ONCE: Once = Once::new();
static COUNTER: AtomicUsize = AtomicUsize::new(0);

assert!(!ONCE.is_completed());

ONCE.call_once(async || {
COUNTER.fetch_add(1, Ordering::SeqCst);
})
.await;

assert!(ONCE.is_completed());
assert_eq!(COUNTER.load(Ordering::SeqCst), 1);

// Second call should not run the closure
ONCE.call_once(async || {
COUNTER.fetch_add(1, Ordering::SeqCst);
})
.await;

assert_eq!(COUNTER.load(Ordering::SeqCst), 1);
}

#[test]
fn test_once_multi_task() {
static ONCE: Once = Once::new();
static COUNTER: AtomicUsize = AtomicUsize::new(0);

test_runtime().block_on(async {
const N: usize = 100;

let latch = Arc::new(Latch::new(N as u32));
let mut handles = Vec::with_capacity(N);

for _ in 0..N {
let latch = latch.clone();
handles.push(tokio::spawn(async move {
ONCE.call_once(async || {
COUNTER.fetch_add(1, Ordering::SeqCst);
})
.await;
latch.count_down();
}));
}

latch.wait().await;

for handle in handles {
handle.await.unwrap();
}

// Only one task should have incremented the counter
assert_eq!(COUNTER.load(Ordering::SeqCst), 1);
assert!(ONCE.is_completed());
});
}

#[tokio::test]
async fn test_once_cancelled() {
static ONCE: Once = Once::new();
static COUNTER: AtomicUsize = AtomicUsize::new(0);

let handle1 = tokio::spawn(async {
let fut = ONCE.call_once(async || {
tokio::time::sleep(Duration::from_millis(1000)).await;
COUNTER.fetch_add(1, Ordering::SeqCst);
});
let timeout = tokio::time::timeout(Duration::from_millis(1), fut).await;
assert!(timeout.is_err());
});

let handle2 = tokio::spawn(async {
tokio::time::sleep(Duration::from_millis(100)).await;
ONCE.call_once(async || {
COUNTER.fetch_add(10, Ordering::SeqCst);
})
.await;
});

handle1.await.unwrap();
handle2.await.unwrap();

// The second task should have run since the first was cancelled
assert_eq!(COUNTER.load(Ordering::SeqCst), 10);
assert!(ONCE.is_completed());
}

#[tokio::test]
async fn test_once_debug() {
let once = Once::new();
let debug_str = format!("{:?}", once);
assert!(debug_str.contains("Once"));
assert!(debug_str.contains("done"));
assert!(debug_str.contains("false"));

once.call_once(async || {}).await;

let debug_str = format!("{:?}", once);
assert!(debug_str.contains("true"));
}

#[tokio::test]
async fn test_once_default() {
let once = Once::default();
assert!(!once.is_completed());
}

#[tokio::test]
async fn test_once_retry_after_panic() {
static ONCE: Once = Once::new();
static COUNTER: AtomicUsize = AtomicUsize::new(0);

let handle = tokio::spawn(async {
ONCE.call_once(async || {
COUNTER.fetch_add(1, Ordering::SeqCst);
panic!("boom");
})
.await;
});

let err = handle.await.expect_err("once init should panic");
assert!(err.is_panic());

ONCE.call_once(async || {
COUNTER.fetch_add(1, Ordering::SeqCst);
})
.await;

assert_eq!(COUNTER.load(Ordering::SeqCst), 2);
assert!(ONCE.is_completed());
}

#[tokio::test]
async fn test_once_wait() {
// wait after call_once completed
{
let once = Once::new();
once.call_once(async || {}).await;
assert_ready!(tokio_test::task::spawn(once.wait()).poll());
}

// wait before call_once completed
{
static ONCE: Once = Once::new();
let handle = tokio::spawn(async {
ONCE.wait().await;
});

tokio::time::sleep(Duration::from_millis(100)).await;
ONCE.call_once(async || {}).await;
handle.await.unwrap();
}
}
3 changes: 3 additions & 0 deletions mea/src/once/once_cell.rs → mea/src/once/once_cell/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ use std::sync::atomic::Ordering;
use crate::semaphore::Semaphore;
use crate::semaphore::SemaphorePermit;

#[cfg(test)]
mod tests;

/// A thread-safe cell which can nominally be written to only once.
///
/// # Examples
Expand Down
Loading