Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.

## Unreleased

### New features

* Implement `Singleflight` pattern for deduplicating concurrent requests.

## [0.6.0] - 2025-01-04

### Breaking changes
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ Mea (Make Easy Async) is a runtime-agnostic library providing essential synchron
* [**ShutdownSend & ShutdownRecv**](https://docs.rs/mea/*/mea/shutdown/): A composite synchronization primitive for managing shutdown signals.
* [**WaitGroup**](https://docs.rs/mea/*/mea/waitgroup/struct.WaitGroup.html): A synchronization primitive that allows waiting for multiple tasks to complete.
* [**atomicbox**](https://docs.rs/mea/*/mea/atomicbox/): A safe, owning version of AtomicPtr for heap-allocated data.
* [**broadcast::channel**](https://docs.rs/mea/*/mea/broadcast/): A multi-producer, multi-consumer broadcast channel.
* [**broadcast**](https://docs.rs/mea/*/mea/broadcast/): A multi-producer, multi-consumer broadcast channel.
* [**mpsc::bounded**](https://docs.rs/mea/*/mea/mpsc/fn.bounded.html): A multi-producer, single-consumer bounded queue for sending values between asynchronous tasks.
* [**mpsc::unbounded**](https://docs.rs/mea/*/mea/mpsc/fn.unbounded.html): A multi-producer, single-consumer unbounded queue for sending values between asynchronous tasks.
* [**oneshot::channel**](https://docs.rs/mea/*/mea/oneshot/): A one-shot channel for sending a single value between tasks.
* [**singleflight::Group**](https://docs.rs/mea/*/mea/singleflight/): A duplicate function call suppression mechanism.

## Installation

Expand Down
12 changes: 6 additions & 6 deletions mea/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@
#![cfg_attr(docsrs, feature(doc_cfg))]
#![deny(missing_docs)]

//! # Mea - Make Easy Async
//!
//! `mea` is a runtime-agnostic library providing essential synchronization primitives for
//! asynchronous Rust programming. The library offers a collection of well-tested, efficient
//! synchronization tools that work with any async runtime.
//!
//! ## Features
//! # Features
//!
//! * [`Barrier`]: A synchronization point where multiple tasks can wait until all participants
//! arrive
Expand All @@ -37,20 +35,21 @@
//! shutdown signals
//! * [`WaitGroup`]: A synchronization primitive that allows waiting for multiple tasks to complete
//! * [`atomicbox`]: A safe, owning version of `AtomicPtr` for heap-allocated data.
//! * [`broadcast::channel`]: A multi-producer, multi-consumer broadcast channel.
//! * [`broadcast`]: A multi-producer, multi-consumer broadcast channel.
//! * [`mpsc::bounded`]: A multi-producer, single-consumer bounded queue for sending values between
//! asynchronous tasks.
//! * [`mpsc::unbounded`]: A multi-producer, single-consumer unbounded queue for sending values
//! between asynchronous tasks.
//! * [`oneshot::channel`]: A one-shot channel for sending a single value between tasks.
//! * [`singleflight::Group`]: A duplicate function call suppression mechanism.
//!
//! ## Runtime Agnostic
//! # Runtime Agnostic
//!
//! All synchronization primitives in this library are runtime-agnostic, meaning they can be used
//! with any async runtime like Tokio, async-std, or others. This makes the library highly versatile
//! and portable.
//!
//! ## Thread Safety
//! # Thread Safety
//!
//! All types in this library implement `Send` and `Sync`, making them safe to share across thread
//! boundaries. This is essential for concurrent programming where data needs to be accessed from
Expand Down Expand Up @@ -82,6 +81,7 @@ pub mod oneshot;
pub mod rwlock;
pub mod semaphore;
pub mod shutdown;
pub mod singleflight;
pub mod waitgroup;

#[cfg(test)]
Expand Down
149 changes: 149 additions & 0 deletions mea/src/singleflight/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright 2024 tison <wander4096@gmail.com>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Singleflight provides a duplicate function call suppression mechanism.

use std::collections::HashMap;
use std::hash::Hash;
use std::sync::Arc;

use crate::internal::Mutex;
use crate::once::OnceCell;

#[cfg(test)]
mod tests;

/// Group represents a class of work and forms a namespace in which
/// units of work can be executed with duplicate suppression.
#[derive(Debug)]
pub struct Group<K, V> {
map: Mutex<HashMap<K, Arc<OnceCell<V>>>>,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be improved with more efficient concurrent hashmap. But Go's impl use simple Mutext as well so I think it's generally OK.

We can leave using other hashmap impls (and thus extra deps) a follow-up.

}

impl<K, V> Default for Group<K, V>
where
K: Eq + Hash + Clone,
V: Clone,
{
fn default() -> Self {
Self::new()
}
}

impl<K, V> Group<K, V>
where
K: Eq + Hash + Clone,
V: Clone,
{
/// Creates a new Group.
pub fn new() -> Self {
Self {
map: Mutex::new(HashMap::new()),
}
}

/// Executes and returns the results of the given function, making sure that only one execution
/// is in-flight for a given key at a time.
///
/// If a duplicate comes in, the duplicate caller waits for the original to complete and
/// receives the same results.
///
/// Once the function completes, the key, if not [`forgotten`], is removed from the group,
/// allowing future calls with the same key to execute the function again.
///
/// [`forgotten`]: Self::forget
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
/// use std::sync::atomic::AtomicUsize;
/// use std::sync::atomic::Ordering;
/// use std::time::Duration;
///
/// use mea::singleflight::Group;
///
/// # #[tokio::main]
/// # async fn main() {
/// let group = Group::new();
/// let counter = Arc::new(AtomicUsize::new(0));
///
/// let c1 = counter.clone();
/// let fut1 = group.work("key", || async move {
/// c1.fetch_add(1, Ordering::SeqCst);
/// // simulate heavy work to avoid immediate completion
/// tokio::time::sleep(Duration::from_millis(100)).await;
/// "result"
/// });
///
/// let c2 = counter.clone();
/// let fut2 = group.work("key", || async move {
/// c2.fetch_add(1, Ordering::SeqCst);
/// // simulate heavy work to avoid immediate completion
/// tokio::time::sleep(Duration::from_millis(100)).await;
/// "result"
/// });
///
/// let (r1, r2) = tokio::join!(fut1, fut2);
///
/// assert_eq!(r1, "result");
/// assert_eq!(r2, "result");
/// assert_eq!(counter.load(Ordering::SeqCst), 1);
/// # }
/// ```
pub async fn work<F>(&self, key: K, func: F) -> V
where
F: AsyncFnOnce() -> V,
{
// 1. Get or create the OnceCell.
let cell = {
let mut map = self.map.lock();
map.entry(key.clone())
.or_insert_with(|| Arc::new(OnceCell::new()))
.clone()
};

// 2. Try to initialize the cell.
// OnceCell::get_or_init guarantees that only one task executes the closure.
let res = cell
.get_or_init(async || {
// I am the leader.
let result = func().await;

// Cleanup: remove the key from the map.
// We must ensure we remove the entry corresponding to *this* cell.
let mut map = self.map.lock();
if let Some(existing) = map.get(&key) {
// Check if the map still points to our cell.
if Arc::ptr_eq(&cell, existing) {
map.remove(&key);
}
}

result
})
.await;

res.clone()
Comment on lines +119 to +138
Copy link
Collaborator Author

@tisonkun tisonkun Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative impl method is to build the sync block manually, with potentially registered oneshot channels.

In that case, we can reduce one .clone() if there is no other waiter, and the wakeup logic can be done in batch.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I investigate the OnceCell impl in mea, find some potential problems:

1. leader computed result
2. leader remove key from `Group.map`
3. but OnceCell not yet soon set_value, value_set still false
4. new coming calls see key not in map -> create a new OnceCell -> become new leader

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW: This is not a concurrency safety issue, but a semantic safety issue.

Copy link
Collaborator Author

@tisonkun tisonkun Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me see ...

I've tried the onshot channels solution, but that can hardly handle if the first leader panics. So I tend to keep the OnceCell solution now at the cost of one extra clone of the result.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I investigate the OnceCell impl in mea, find some potential problems:

1. leader computed result
2. leader remove key from `Group.map`
3. but OnceCell not yet soon set_value, value_set still false
4. new coming calls see key not in map -> create a new OnceCell -> become new leader

This should not be an issue because you can consider the call comes at step 4 "happen after" the result was set and delivered.

Singleflight is not a cache, for sure.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup

}

/// Forgets about the given key.
///
/// Future calls to `work` for this key will call the function rather than waiting for an
/// earlier call to complete. Existing calls to `work` for this key are not affected.
pub fn forget(&self, key: &K) {
let mut map = self.map.lock();
map.remove(key);
}
}
144 changes: 144 additions & 0 deletions mea/src/singleflight/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright 2024 tison <wander4096@gmail.com>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::time::Duration;

use crate::singleflight::Group;

#[tokio::test]
async fn test_simple() {
let group = Group::new();
let res = group.work("key", || async { "val" }).await;
assert_eq!(res, "val");
}

#[tokio::test]
async fn test_coalescing() {
let group = Arc::new(Group::new());
let counter = Arc::new(AtomicUsize::new(0));

let mut handles = Vec::new();
for _ in 0..10 {
let group = group.clone();
let counter = counter.clone();
handles.push(tokio::spawn(async move {
group
.work("key", || async move {
tokio::time::sleep(Duration::from_millis(100)).await;
counter.fetch_add(1, Ordering::SeqCst);
"val"
})
.await
}));
}

for handle in handles {
assert_eq!(handle.await.unwrap(), "val");
}

assert_eq!(counter.load(Ordering::SeqCst), 1);
}

#[tokio::test]
async fn test_multiple_keys() {
let group = Arc::new(Group::new());
let counter = Arc::new(AtomicUsize::new(0));

let g1 = group.clone();
let c1 = counter.clone();
let h1 = tokio::spawn(async move {
g1.work("key1", || async move {
tokio::time::sleep(Duration::from_millis(50)).await;
c1.fetch_add(1, Ordering::SeqCst);
"val1"
})
.await
});

let g2 = group.clone();
let c2 = counter.clone();
let h2 = tokio::spawn(async move {
g2.work("key2", || async move {
tokio::time::sleep(Duration::from_millis(50)).await;
c2.fetch_add(1, Ordering::SeqCst);
"val2"
})
.await
});

assert_eq!(h1.await.unwrap(), "val1");
assert_eq!(h2.await.unwrap(), "val2");
assert_eq!(counter.load(Ordering::SeqCst), 2);
}

#[tokio::test]
async fn test_forget() {
let group = Arc::new(Group::new());
let counter = Arc::new(AtomicUsize::new(0));

let g1 = group.clone();
let c1 = counter.clone();
let h1 = tokio::spawn(async move {
g1.work("key", || async move {
tokio::time::sleep(Duration::from_millis(100)).await;
c1.fetch_add(1, Ordering::SeqCst);
"val1"
})
.await
});

// Wait a bit to ensure the first call is established
tokio::time::sleep(Duration::from_millis(10)).await;
group.forget(&"key");

let g2 = group.clone();
let c2 = counter.clone();
let h2 = tokio::spawn(async move {
g2.work("key", || async move {
tokio::time::sleep(Duration::from_millis(100)).await;
c2.fetch_add(1, Ordering::SeqCst);
"val2"
})
.await
});

assert_eq!(h1.await.unwrap(), "val1");
assert_eq!(h2.await.unwrap(), "val2");
assert_eq!(counter.load(Ordering::SeqCst), 2);
}

#[tokio::test]
async fn test_panic_safe() {
let group = Arc::new(Group::<&str, String>::new());

// Task that panics
let g1 = group.clone();
let h1 = tokio::spawn(async move {
g1.work("key", || async {
panic!("oops");
})
.await
});

// Wait for h1 to panic and exit
let err = h1.await.unwrap_err();
assert!(err.is_panic());

// Next task should succeed (new attempt)
let res = group.work("key", || async { "success".to_string() }).await;
assert_eq!(res, "success");
}