From 887ead86dc0baf1dd8325e0df9970fe0544b5c23 Mon Sep 17 00:00:00 2001 From: tison Date: Fri, 9 Jan 2026 11:00:05 +0800 Subject: [PATCH 1/2] feat: impl singleflight Signed-off-by: tison --- CHANGELOG.md | 4 ++++ mea/src/lib.rs | 1 + mea/src/singleflight/mod.rs | 1 + 3 files changed, 6 insertions(+) create mode 100644 mea/src/singleflight/mod.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 758f8d6..6422824 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file. ## Unreleased +### New features + +* Implement `Singleflight` pattern for deduplicating concurrent requests. + ## [0.6.0] - 2025-01-04 ### Breaking changes diff --git a/mea/src/lib.rs b/mea/src/lib.rs index 45cdfbe..bf727fe 100644 --- a/mea/src/lib.rs +++ b/mea/src/lib.rs @@ -81,6 +81,7 @@ pub mod once; pub mod oneshot; pub mod rwlock; pub mod semaphore; +pub mod singleflight; pub mod shutdown; pub mod waitgroup; diff --git a/mea/src/singleflight/mod.rs b/mea/src/singleflight/mod.rs new file mode 100644 index 0000000..5fd4511 --- /dev/null +++ b/mea/src/singleflight/mod.rs @@ -0,0 +1 @@ +//! Singleflight provides a duplicate function call suppression mechanism. From 130402caf1d123ed88ee425c15e08ce274ec2464 Mon Sep 17 00:00:00 2001 From: tison Date: Fri, 9 Jan 2026 11:58:06 +0800 Subject: [PATCH 2/2] do impl Signed-off-by: tison --- README.md | 3 +- mea/src/lib.rs | 13 ++- mea/src/singleflight/mod.rs | 148 ++++++++++++++++++++++++++++++++++ mea/src/singleflight/tests.rs | 144 +++++++++++++++++++++++++++++++++ 4 files changed, 300 insertions(+), 8 deletions(-) create mode 100644 mea/src/singleflight/tests.rs diff --git a/README.md b/README.md index 52b85ff..57104ce 100644 --- a/README.md +++ b/README.md @@ -33,10 +33,11 @@ Mea (Make Easy Async) is a runtime-agnostic library providing essential synchron * [**ShutdownSend & ShutdownRecv**](https://docs.rs/mea/*/mea/shutdown/): A composite synchronization primitive for managing shutdown signals. * [**WaitGroup**](https://docs.rs/mea/*/mea/waitgroup/struct.WaitGroup.html): A synchronization primitive that allows waiting for multiple tasks to complete. * [**atomicbox**](https://docs.rs/mea/*/mea/atomicbox/): A safe, owning version of AtomicPtr for heap-allocated data. -* [**broadcast::channel**](https://docs.rs/mea/*/mea/broadcast/): A multi-producer, multi-consumer broadcast channel. +* [**broadcast**](https://docs.rs/mea/*/mea/broadcast/): A multi-producer, multi-consumer broadcast channel. * [**mpsc::bounded**](https://docs.rs/mea/*/mea/mpsc/fn.bounded.html): A multi-producer, single-consumer bounded queue for sending values between asynchronous tasks. * [**mpsc::unbounded**](https://docs.rs/mea/*/mea/mpsc/fn.unbounded.html): A multi-producer, single-consumer unbounded queue for sending values between asynchronous tasks. * [**oneshot::channel**](https://docs.rs/mea/*/mea/oneshot/): A one-shot channel for sending a single value between tasks. +* [**singleflight::Group**](https://docs.rs/mea/*/mea/singleflight/): A duplicate function call suppression mechanism. ## Installation diff --git a/mea/src/lib.rs b/mea/src/lib.rs index bf727fe..54725eb 100644 --- a/mea/src/lib.rs +++ b/mea/src/lib.rs @@ -15,13 +15,11 @@ #![cfg_attr(docsrs, feature(doc_cfg))] #![deny(missing_docs)] -//! # Mea - Make Easy Async -//! //! `mea` is a runtime-agnostic library providing essential synchronization primitives for //! asynchronous Rust programming. The library offers a collection of well-tested, efficient //! synchronization tools that work with any async runtime. //! -//! ## Features +//! # Features //! //! * [`Barrier`]: A synchronization point where multiple tasks can wait until all participants //! arrive @@ -37,20 +35,21 @@ //! shutdown signals //! * [`WaitGroup`]: A synchronization primitive that allows waiting for multiple tasks to complete //! * [`atomicbox`]: A safe, owning version of `AtomicPtr` for heap-allocated data. -//! * [`broadcast::channel`]: A multi-producer, multi-consumer broadcast channel. +//! * [`broadcast`]: A multi-producer, multi-consumer broadcast channel. //! * [`mpsc::bounded`]: A multi-producer, single-consumer bounded queue for sending values between //! asynchronous tasks. //! * [`mpsc::unbounded`]: A multi-producer, single-consumer unbounded queue for sending values //! between asynchronous tasks. //! * [`oneshot::channel`]: A one-shot channel for sending a single value between tasks. +//! * [`singleflight::Group`]: A duplicate function call suppression mechanism. //! -//! ## Runtime Agnostic +//! # Runtime Agnostic //! //! All synchronization primitives in this library are runtime-agnostic, meaning they can be used //! with any async runtime like Tokio, async-std, or others. This makes the library highly versatile //! and portable. //! -//! ## Thread Safety +//! # Thread Safety //! //! All types in this library implement `Send` and `Sync`, making them safe to share across thread //! boundaries. This is essential for concurrent programming where data needs to be accessed from @@ -81,8 +80,8 @@ pub mod once; pub mod oneshot; pub mod rwlock; pub mod semaphore; -pub mod singleflight; pub mod shutdown; +pub mod singleflight; pub mod waitgroup; #[cfg(test)] diff --git a/mea/src/singleflight/mod.rs b/mea/src/singleflight/mod.rs index 5fd4511..9c335d2 100644 --- a/mea/src/singleflight/mod.rs +++ b/mea/src/singleflight/mod.rs @@ -1 +1,149 @@ +// Copyright 2024 tison +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + //! Singleflight provides a duplicate function call suppression mechanism. + +use std::collections::HashMap; +use std::hash::Hash; +use std::sync::Arc; + +use crate::internal::Mutex; +use crate::once::OnceCell; + +#[cfg(test)] +mod tests; + +/// Group represents a class of work and forms a namespace in which +/// units of work can be executed with duplicate suppression. +#[derive(Debug)] +pub struct Group { + map: Mutex>>>, +} + +impl Default for Group +where + K: Eq + Hash + Clone, + V: Clone, +{ + fn default() -> Self { + Self::new() + } +} + +impl Group +where + K: Eq + Hash + Clone, + V: Clone, +{ + /// Creates a new Group. + pub fn new() -> Self { + Self { + map: Mutex::new(HashMap::new()), + } + } + + /// Executes and returns the results of the given function, making sure that only one execution + /// is in-flight for a given key at a time. + /// + /// If a duplicate comes in, the duplicate caller waits for the original to complete and + /// receives the same results. + /// + /// Once the function completes, the key, if not [`forgotten`], is removed from the group, + /// allowing future calls with the same key to execute the function again. + /// + /// [`forgotten`]: Self::forget + /// + /// # Examples + /// + /// ``` + /// use std::sync::Arc; + /// use std::sync::atomic::AtomicUsize; + /// use std::sync::atomic::Ordering; + /// use std::time::Duration; + /// + /// use mea::singleflight::Group; + /// + /// # #[tokio::main] + /// # async fn main() { + /// let group = Group::new(); + /// let counter = Arc::new(AtomicUsize::new(0)); + /// + /// let c1 = counter.clone(); + /// let fut1 = group.work("key", || async move { + /// c1.fetch_add(1, Ordering::SeqCst); + /// // simulate heavy work to avoid immediate completion + /// tokio::time::sleep(Duration::from_millis(100)).await; + /// "result" + /// }); + /// + /// let c2 = counter.clone(); + /// let fut2 = group.work("key", || async move { + /// c2.fetch_add(1, Ordering::SeqCst); + /// // simulate heavy work to avoid immediate completion + /// tokio::time::sleep(Duration::from_millis(100)).await; + /// "result" + /// }); + /// + /// let (r1, r2) = tokio::join!(fut1, fut2); + /// + /// assert_eq!(r1, "result"); + /// assert_eq!(r2, "result"); + /// assert_eq!(counter.load(Ordering::SeqCst), 1); + /// # } + /// ``` + pub async fn work(&self, key: K, func: F) -> V + where + F: AsyncFnOnce() -> V, + { + // 1. Get or create the OnceCell. + let cell = { + let mut map = self.map.lock(); + map.entry(key.clone()) + .or_insert_with(|| Arc::new(OnceCell::new())) + .clone() + }; + + // 2. Try to initialize the cell. + // OnceCell::get_or_init guarantees that only one task executes the closure. + let res = cell + .get_or_init(async || { + // I am the leader. + let result = func().await; + + // Cleanup: remove the key from the map. + // We must ensure we remove the entry corresponding to *this* cell. + let mut map = self.map.lock(); + if let Some(existing) = map.get(&key) { + // Check if the map still points to our cell. + if Arc::ptr_eq(&cell, existing) { + map.remove(&key); + } + } + + result + }) + .await; + + res.clone() + } + + /// Forgets about the given key. + /// + /// Future calls to `work` for this key will call the function rather than waiting for an + /// earlier call to complete. Existing calls to `work` for this key are not affected. + pub fn forget(&self, key: &K) { + let mut map = self.map.lock(); + map.remove(key); + } +} diff --git a/mea/src/singleflight/tests.rs b/mea/src/singleflight/tests.rs new file mode 100644 index 0000000..19c41a9 --- /dev/null +++ b/mea/src/singleflight/tests.rs @@ -0,0 +1,144 @@ +// Copyright 2024 tison +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use std::time::Duration; + +use crate::singleflight::Group; + +#[tokio::test] +async fn test_simple() { + let group = Group::new(); + let res = group.work("key", || async { "val" }).await; + assert_eq!(res, "val"); +} + +#[tokio::test] +async fn test_coalescing() { + let group = Arc::new(Group::new()); + let counter = Arc::new(AtomicUsize::new(0)); + + let mut handles = Vec::new(); + for _ in 0..10 { + let group = group.clone(); + let counter = counter.clone(); + handles.push(tokio::spawn(async move { + group + .work("key", || async move { + tokio::time::sleep(Duration::from_millis(100)).await; + counter.fetch_add(1, Ordering::SeqCst); + "val" + }) + .await + })); + } + + for handle in handles { + assert_eq!(handle.await.unwrap(), "val"); + } + + assert_eq!(counter.load(Ordering::SeqCst), 1); +} + +#[tokio::test] +async fn test_multiple_keys() { + let group = Arc::new(Group::new()); + let counter = Arc::new(AtomicUsize::new(0)); + + let g1 = group.clone(); + let c1 = counter.clone(); + let h1 = tokio::spawn(async move { + g1.work("key1", || async move { + tokio::time::sleep(Duration::from_millis(50)).await; + c1.fetch_add(1, Ordering::SeqCst); + "val1" + }) + .await + }); + + let g2 = group.clone(); + let c2 = counter.clone(); + let h2 = tokio::spawn(async move { + g2.work("key2", || async move { + tokio::time::sleep(Duration::from_millis(50)).await; + c2.fetch_add(1, Ordering::SeqCst); + "val2" + }) + .await + }); + + assert_eq!(h1.await.unwrap(), "val1"); + assert_eq!(h2.await.unwrap(), "val2"); + assert_eq!(counter.load(Ordering::SeqCst), 2); +} + +#[tokio::test] +async fn test_forget() { + let group = Arc::new(Group::new()); + let counter = Arc::new(AtomicUsize::new(0)); + + let g1 = group.clone(); + let c1 = counter.clone(); + let h1 = tokio::spawn(async move { + g1.work("key", || async move { + tokio::time::sleep(Duration::from_millis(100)).await; + c1.fetch_add(1, Ordering::SeqCst); + "val1" + }) + .await + }); + + // Wait a bit to ensure the first call is established + tokio::time::sleep(Duration::from_millis(10)).await; + group.forget(&"key"); + + let g2 = group.clone(); + let c2 = counter.clone(); + let h2 = tokio::spawn(async move { + g2.work("key", || async move { + tokio::time::sleep(Duration::from_millis(100)).await; + c2.fetch_add(1, Ordering::SeqCst); + "val2" + }) + .await + }); + + assert_eq!(h1.await.unwrap(), "val1"); + assert_eq!(h2.await.unwrap(), "val2"); + assert_eq!(counter.load(Ordering::SeqCst), 2); +} + +#[tokio::test] +async fn test_panic_safe() { + let group = Arc::new(Group::<&str, String>::new()); + + // Task that panics + let g1 = group.clone(); + let h1 = tokio::spawn(async move { + g1.work("key", || async { + panic!("oops"); + }) + .await + }); + + // Wait for h1 to panic and exit + let err = h1.await.unwrap_err(); + assert!(err.is_panic()); + + // Next task should succeed (new attempt) + let res = group.work("key", || async { "success".to_string() }).await; + assert_eq!(res, "success"); +}