From 87522b71d5dac399e84f6f3cdb2306d61b173fb5 Mon Sep 17 00:00:00 2001 From: Patrik Cyvoct Date: Thu, 22 Jan 2026 21:58:59 +0100 Subject: [PATCH 1/3] Add configurable behavior for cancelled get() calls Signed-off-by: Patrik Cyvoct --- fastpool/src/bounded.rs | 51 ++++- fastpool/src/common.rs | 11 + fastpool/src/lib.rs | 1 + fastpool/src/unbounded.rs | 41 +++- fastpool/tests/cancellation_tests.rs | 296 +++++++++++++++++++++++++++ 5 files changed, 395 insertions(+), 5 deletions(-) create mode 100644 fastpool/tests/cancellation_tests.rs diff --git a/fastpool/src/bounded.rs b/fastpool/src/bounded.rs index 3b08c7b..b701e9b 100644 --- a/fastpool/src/bounded.rs +++ b/fastpool/src/bounded.rs @@ -83,6 +83,7 @@ use std::sync::atomic::Ordering; use mea::semaphore::OwnedSemaphorePermit; use mea::semaphore::Semaphore; +use crate::CancellationBehavior; use crate::ManageObject; use crate::ObjectStatus; use crate::QueueStrategy; @@ -101,6 +102,9 @@ pub struct PoolConfig { /// /// Determines the order of objects being queued and dequeued. pub queue_strategy: QueueStrategy, + + /// Behavior when a `get()` call is cancelled + pub cancellation_behavior: CancellationBehavior, } impl PoolConfig { @@ -109,6 +113,7 @@ impl PoolConfig { Self { max_size, queue_strategy: QueueStrategy::default(), + cancellation_behavior: CancellationBehavior::default(), } } @@ -117,6 +122,12 @@ impl PoolConfig { self.queue_strategy = queue_strategy; self } + + /// Returns a new [`PoolConfig`] with the specified cancellation behavior. + pub fn with_cancellation_behavior(mut self, cancellation_behavior: CancellationBehavior) -> Self { + self.cancellation_behavior = cancellation_behavior; + self + } } /// The current pool status. @@ -310,6 +321,7 @@ impl Pool { let mut unready_object = UnreadyObject { state: Some(object), pool: Arc::downgrade(self), + cancellation_behavior: self.config.cancellation_behavior, }; let state = unready_object.state(); @@ -323,6 +335,10 @@ impl Pool { state.status.recycle_count += 1; state.status.recycled = Some(std::time::Instant::now()); break unready_object.ready(permit); + } else { + // We need to manually detach here as the drop implementation + // depends on the cancellation behaviour + unready_object.detach(); } } }; @@ -396,6 +412,13 @@ impl Pool { } fn push_back(&self, o: ObjectState) { + self.return_to_pool(o); + self.users.fetch_sub(1, Ordering::Relaxed); + } + + /// This is used when an UnreadyObject is dropped during cancellation, + /// since the scopeguard in get() will handle decrementing users. + fn return_to_pool(&self, o: ObjectState) { let mut slots = self.slots.lock(); assert!( @@ -406,9 +429,6 @@ impl Pool { ); slots.deque.push_back(o); - drop(slots); - - self.users.fetch_sub(1, Ordering::Relaxed); } fn detach_object(&self, o: &mut M::Object, ready: bool) { @@ -517,14 +537,37 @@ impl Object { } } -/// A wrapper of ObjectStatus that detaches the object from the pool when dropped. +/// A wrapper of ObjectState used during the `is_recyclable` check in `Pool::get`. +/// +/// If the check passes, the object is converted to a ready `Object` via `ready()`. +/// If the check fails, `detach()` should be called to permanently remove the object +/// from the pool. If dropped without calling either method (due to cancellation), +/// the behavior depends on the pool's [`CancellationBehavior`] configuration. struct UnreadyObject { state: Option>, pool: Weak>, + cancellation_behavior: CancellationBehavior, } impl Drop for UnreadyObject { fn drop(&mut self) { + if let Some(mut state) = self.state.take() { + if let Some(pool) = self.pool.upgrade() { + match self.cancellation_behavior { + CancellationBehavior::Detach => { + pool.detach_object(&mut state.o, false); + } + CancellationBehavior::ReturnToPool => { + pool.return_to_pool(state); + } + } + } + } + } +} + +impl UnreadyObject { + fn detach(&mut self) { if let Some(mut state) = self.state.take() { if let Some(pool) = self.pool.upgrade() { pool.detach_object(&mut state.o, false); diff --git a/fastpool/src/common.rs b/fastpool/src/common.rs index f5a97eb..54af7bb 100644 --- a/fastpool/src/common.rs +++ b/fastpool/src/common.rs @@ -90,3 +90,14 @@ pub enum QueueStrategy { /// This strategy behaves like a stack. Lifo, } + +/// Behavior when a `get()` call is cancelled during the `is_recyclable()` check. +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] +pub enum CancellationBehavior { + /// Detach the object from the pool (default). + #[default] + Detach, + + /// Return the object to the pool for potential reuse. + ReturnToPool, +} diff --git a/fastpool/src/lib.rs b/fastpool/src/lib.rs index 57e74b3..7cb95e0 100644 --- a/fastpool/src/lib.rs +++ b/fastpool/src/lib.rs @@ -195,6 +195,7 @@ //! } //! ``` +pub use common::CancellationBehavior; pub use common::ManageObject; pub use common::ObjectStatus; pub use common::QueueStrategy; diff --git a/fastpool/src/unbounded.rs b/fastpool/src/unbounded.rs index a9d03c9..8e482b6 100644 --- a/fastpool/src/unbounded.rs +++ b/fastpool/src/unbounded.rs @@ -99,6 +99,7 @@ use std::ops::DerefMut; use std::sync::Arc; use std::sync::Weak; +use crate::CancellationBehavior; use crate::ManageObject; use crate::ObjectStatus; use crate::QueueStrategy; @@ -114,6 +115,9 @@ pub struct PoolConfig { /// /// Determines the order of objects being queued and dequeued. pub queue_strategy: QueueStrategy, + + /// Behavior when a `get()` call is cancelled + pub cancellation_behavior: CancellationBehavior, } impl Default for PoolConfig { @@ -127,6 +131,7 @@ impl PoolConfig { pub fn new() -> Self { Self { queue_strategy: QueueStrategy::default(), + cancellation_behavior: CancellationBehavior::default(), } } @@ -135,6 +140,12 @@ impl PoolConfig { self.queue_strategy = queue_strategy; self } + + /// Returns a new [`PoolConfig`] with the specified cancellation behavior. + pub fn with_cancellation_behavior(mut self, cancellation_behavior: CancellationBehavior) -> Self { + self.cancellation_behavior = cancellation_behavior; + self + } } /// The current pool status. @@ -324,6 +335,7 @@ impl> Pool { let mut unready_object = UnreadyObject { state: Some(object), pool: Arc::downgrade(self), + cancellation_behavior: self.config.cancellation_behavior, }; let state = unready_object.state(); @@ -337,6 +349,10 @@ impl> Pool { state.status.recycle_count += 1; state.status.recycled = Some(std::time::Instant::now()); break unready_object.ready(); + } else { + // We need to manually detach here as the drop implementation + // depends on the cancellation behaviour + unready_object.detach(); } } }; @@ -538,14 +554,37 @@ impl> Object { } } -/// A wrapper of ObjectStatus that detaches the object from the pool when dropped. +/// A wrapper of ObjectState used during the `is_recyclable` check in `Pool::get`. +/// +/// If the check passes, the object is converted to a ready `Object` via `ready()`. +/// If the check fails, `detach()` should be called to permanently remove the object +/// from the pool. If dropped without calling either method (due to cancellation), +/// the behavior depends on the pool's [`CancellationBehavior`] configuration. struct UnreadyObject = NeverManageObject> { state: Option>, pool: Weak>, + cancellation_behavior: CancellationBehavior, } impl> Drop for UnreadyObject { fn drop(&mut self) { + if let Some(mut state) = self.state.take() { + if let Some(pool) = self.pool.upgrade() { + match self.cancellation_behavior { + CancellationBehavior::Detach => { + pool.detach_object(&mut state.o); + } + CancellationBehavior::ReturnToPool => { + pool.push_back(state); + } + } + } + } + } +} + +impl> UnreadyObject { + fn detach(&mut self) { if let Some(mut state) = self.state.take() { if let Some(pool) = self.pool.upgrade() { pool.detach_object(&mut state.o); diff --git a/fastpool/tests/cancellation_tests.rs b/fastpool/tests/cancellation_tests.rs new file mode 100644 index 0000000..11554ee --- /dev/null +++ b/fastpool/tests/cancellation_tests.rs @@ -0,0 +1,296 @@ +// Copyright 2025 FastLabs Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::convert::Infallible; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +use fastpool::CancellationBehavior; +use fastpool::ManageObject; +use fastpool::ObjectStatus; + +struct SlowRecycleManager { + created_count: Arc, + recycle_delay: Duration, +} + +impl SlowRecycleManager { + fn new(created_count: Arc, recycle_delay: Duration) -> Self { + Self { + created_count, + recycle_delay, + } + } +} + +impl ManageObject for SlowRecycleManager { + type Object = usize; + type Error = Infallible; + + async fn create(&self) -> Result { + let id = self.created_count.fetch_add(1, Ordering::SeqCst); + Ok(id) + } + + async fn is_recyclable( + &self, + _o: &mut Self::Object, + _status: &ObjectStatus, + ) -> Result<(), Self::Error> { + tokio::time::sleep(self.recycle_delay).await; + Ok(()) + } +} + +mod bounded_tests { + use super::*; + use fastpool::bounded::Pool; + use fastpool::bounded::PoolConfig; + + /// Test default behavior (Detach): cancelled get() calls detach objects from the pool. + #[tokio::test] + async fn test_default_detach_behavior() { + const MAX_SIZE: usize = 1; + let created_count = Arc::new(AtomicUsize::new(0)); + let manager = SlowRecycleManager::new(created_count.clone(), Duration::from_millis(100)); + let pool = Pool::new(PoolConfig::new(MAX_SIZE), manager); + + let obj = pool.get().await.unwrap(); + assert_eq!(*obj, 0); + assert_eq!(pool.status().current_size, 1); + + drop(obj); + assert_eq!(pool.status().current_size, 1); + assert_eq!(pool.status().idle_count, 1); + + let timeout_result = + tokio::time::timeout(Duration::from_millis(10), pool.get()).await; + assert!(timeout_result.is_err(), "Should have timed out"); + + tokio::time::sleep(Duration::from_millis(10)).await; + + let status = pool.status(); + assert_eq!( + status.current_size, 0, + "Pool size should be 0 after cancelled get() with Detach behavior" + ); + + let obj = pool.get().await.unwrap(); + assert_eq!(*obj, 1, "Should be a new object (id=1)"); + assert_eq!(created_count.load(Ordering::SeqCst), 2, "Two objects should have been created"); + } + + /// Test ReturnToPool behavior: cancelled get() calls return objects to the pool. + #[tokio::test] + async fn test_return_to_pool_behavior() { + const MAX_SIZE: usize = 1; + let created_count = Arc::new(AtomicUsize::new(0)); + let manager = SlowRecycleManager::new(created_count.clone(), Duration::from_millis(100)); + let config = PoolConfig::new(MAX_SIZE) + .with_cancellation_behavior(CancellationBehavior::ReturnToPool); + let pool = Pool::new(config, manager); + + let obj = pool.get().await.unwrap(); + assert_eq!(*obj, 0); + assert_eq!(pool.status().current_size, 1); + assert_eq!(created_count.load(Ordering::SeqCst), 1); + + drop(obj); + assert_eq!(pool.status().current_size, 1); + assert_eq!(pool.status().idle_count, 1); + + let timeout_result = + tokio::time::timeout(Duration::from_millis(10), pool.get()).await; + assert!(timeout_result.is_err(), "Should have timed out"); + + tokio::time::sleep(Duration::from_millis(10)).await; + + let status = pool.status(); + assert_eq!( + status.current_size, 1, + "Pool size should be preserved after cancelled get() with ReturnToPool behavior" + ); + assert_eq!( + status.idle_count, 1, + "Object should be back in idle state after cancelled get()" + ); + + assert_eq!( + created_count.load(Ordering::SeqCst), + 1, + "No extra objects should be created" + ); + + let obj = pool.get().await.unwrap(); + assert_eq!(*obj, 0, "Should get the same object back"); + } + + /// Test that multiple cancelled get() calls with ReturnToPool don't shrink the pool. + #[tokio::test] + async fn test_multiple_cancelled_gets_with_return_to_pool() { + const MAX_SIZE: usize = 3; + let created_count = Arc::new(AtomicUsize::new(0)); + let manager = SlowRecycleManager::new(created_count.clone(), Duration::from_millis(100)); + let config = PoolConfig::new(MAX_SIZE) + .with_cancellation_behavior(CancellationBehavior::ReturnToPool); + let pool = Pool::new(config, manager); + + let obj1 = pool.get().await.unwrap(); + let obj2 = pool.get().await.unwrap(); + let obj3 = pool.get().await.unwrap(); + + drop((obj1, obj2, obj3)); + assert_eq!(pool.status().current_size, 3); + assert_eq!(pool.status().idle_count, 3); + + for _ in 0..5 { + let _ = tokio::time::timeout(Duration::from_millis(10), pool.get()).await; + tokio::time::sleep(Duration::from_millis(5)).await; + } + + let status = pool.status(); + assert_eq!( + status.current_size, 3, + "Pool size should be preserved after multiple cancelled gets" + ); + } + + /// Test that failed is_recyclable still properly detaches objects (regardless of cancellation behavior). + #[tokio::test] + async fn test_failed_recyclable_still_detaches() { + const MAX_SIZE: usize = 1; + let created_count = Arc::new(AtomicUsize::new(0)); + let manager = SlowRecycleManager::new(created_count.clone(), Duration::from_millis(10)); + let config = PoolConfig::new(MAX_SIZE) + .with_cancellation_behavior(CancellationBehavior::ReturnToPool); + let pool = Pool::new(config, manager); + + let obj = pool.get().await.unwrap(); + assert_eq!(*obj, 0); + drop(obj); + assert_eq!(pool.status().current_size, 1); + assert_eq!(pool.status().idle_count, 1); + + let obj = pool.get().await.unwrap(); + assert_eq!(*obj, 0, "Should get the recycled object"); + assert_eq!(obj.status().recycle_count(), 1, "Should have been recycled once"); + } +} + +mod unbounded_tests { + use super::*; + use fastpool::unbounded::Pool; + use fastpool::unbounded::PoolConfig; + + /// Test default behavior (Detach): cancelled get() calls detach objects from the unbounded pool. + #[tokio::test] + async fn test_default_detach_behavior() { + let created_count = Arc::new(AtomicUsize::new(0)); + let manager = SlowRecycleManager::new(created_count.clone(), Duration::from_millis(100)); + let pool = Pool::new(PoolConfig::default(), manager); + + let obj = pool.get().await.unwrap(); + assert_eq!(*obj, 0); + assert_eq!(pool.status().current_size, 1); + + drop(obj); + assert_eq!(pool.status().current_size, 1); + assert_eq!(pool.status().idle_count, 1); + + let timeout_result = + tokio::time::timeout(Duration::from_millis(10), pool.get()).await; + assert!(timeout_result.is_err(), "Should have timed out"); + + tokio::time::sleep(Duration::from_millis(10)).await; + + let status = pool.status(); + assert_eq!( + status.current_size, 0, + "Pool size should be 0 after cancelled get() with Detach behavior" + ); + + let obj = pool.get().await.unwrap(); + assert_eq!(*obj, 1, "Should be a new object (id=1)"); + assert_eq!(created_count.load(Ordering::SeqCst), 2, "Two objects should have been created"); + } + + /// Test ReturnToPool behavior: cancelled get() calls return objects to the unbounded pool. + #[tokio::test] + async fn test_return_to_pool_behavior() { + let created_count = Arc::new(AtomicUsize::new(0)); + let manager = SlowRecycleManager::new(created_count.clone(), Duration::from_millis(100)); + let config = PoolConfig::new() + .with_cancellation_behavior(CancellationBehavior::ReturnToPool); + let pool = Pool::new(config, manager); + + let obj = pool.get().await.unwrap(); + assert_eq!(*obj, 0); + assert_eq!(pool.status().current_size, 1); + assert_eq!(created_count.load(Ordering::SeqCst), 1); + + drop(obj); + assert_eq!(pool.status().current_size, 1); + assert_eq!(pool.status().idle_count, 1); + + let timeout_result = + tokio::time::timeout(Duration::from_millis(10), pool.get()).await; + assert!(timeout_result.is_err(), "Should have timed out"); + + tokio::time::sleep(Duration::from_millis(10)).await; + + let status = pool.status(); + assert_eq!( + status.current_size, 1, + "Pool size should be preserved after cancelled get() with ReturnToPool behavior" + ); + assert_eq!( + status.idle_count, 1, + "Object should be back in idle state after cancelled get()" + ); + + // Verify we can still get the same object + let obj = pool.get().await.unwrap(); + assert_eq!(*obj, 0, "Should get the same object back"); + } + + /// Test that multiple cancelled get() calls with ReturnToPool don't shrink the unbounded pool. + #[tokio::test] + async fn test_multiple_cancelled_gets_with_return_to_pool() { + let created_count = Arc::new(AtomicUsize::new(0)); + let manager = SlowRecycleManager::new(created_count.clone(), Duration::from_millis(100)); + let config = PoolConfig::new() + .with_cancellation_behavior(CancellationBehavior::ReturnToPool); + let pool = Pool::new(config, manager); + + let obj1 = pool.get().await.unwrap(); + let obj2 = pool.get().await.unwrap(); + let obj3 = pool.get().await.unwrap(); + + drop((obj1, obj2, obj3)); + assert_eq!(pool.status().current_size, 3); + assert_eq!(pool.status().idle_count, 3); + + for _ in 0..5 { + let _ = tokio::time::timeout(Duration::from_millis(10), pool.get()).await; + tokio::time::sleep(Duration::from_millis(5)).await; + } + + let status = pool.status(); + assert_eq!( + status.current_size, 3, + "Pool size should be preserved after multiple cancelled gets" + ); + } +} From c238a331c0e6776131ec26836fd28c8b3a306b42 Mon Sep 17 00:00:00 2001 From: tison Date: Sat, 24 Jan 2026 12:57:24 +0800 Subject: [PATCH 2/3] fine tune Signed-off-by: tison --- fastpool/src/bounded.rs | 5 ++- fastpool/src/unbounded.rs | 5 ++- fastpool/tests/cancellation_tests.rs | 53 +++++++++++++++++----------- xtask/src/main.rs | 12 +++---- 4 files changed, 46 insertions(+), 29 deletions(-) diff --git a/fastpool/src/bounded.rs b/fastpool/src/bounded.rs index b701e9b..dc76d7c 100644 --- a/fastpool/src/bounded.rs +++ b/fastpool/src/bounded.rs @@ -124,7 +124,10 @@ impl PoolConfig { } /// Returns a new [`PoolConfig`] with the specified cancellation behavior. - pub fn with_cancellation_behavior(mut self, cancellation_behavior: CancellationBehavior) -> Self { + pub fn with_cancellation_behavior( + mut self, + cancellation_behavior: CancellationBehavior, + ) -> Self { self.cancellation_behavior = cancellation_behavior; self } diff --git a/fastpool/src/unbounded.rs b/fastpool/src/unbounded.rs index 8e482b6..d510dc6 100644 --- a/fastpool/src/unbounded.rs +++ b/fastpool/src/unbounded.rs @@ -142,7 +142,10 @@ impl PoolConfig { } /// Returns a new [`PoolConfig`] with the specified cancellation behavior. - pub fn with_cancellation_behavior(mut self, cancellation_behavior: CancellationBehavior) -> Self { + pub fn with_cancellation_behavior( + mut self, + cancellation_behavior: CancellationBehavior, + ) -> Self { self.cancellation_behavior = cancellation_behavior; self } diff --git a/fastpool/tests/cancellation_tests.rs b/fastpool/tests/cancellation_tests.rs index 11554ee..bc7a08b 100644 --- a/fastpool/tests/cancellation_tests.rs +++ b/fastpool/tests/cancellation_tests.rs @@ -13,8 +13,9 @@ // limitations under the License. use std::convert::Infallible; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; use std::time::Duration; use fastpool::CancellationBehavior; @@ -55,10 +56,11 @@ impl ManageObject for SlowRecycleManager { } mod bounded_tests { - use super::*; use fastpool::bounded::Pool; use fastpool::bounded::PoolConfig; + use super::*; + /// Test default behavior (Detach): cancelled get() calls detach objects from the pool. #[tokio::test] async fn test_default_detach_behavior() { @@ -75,8 +77,7 @@ mod bounded_tests { assert_eq!(pool.status().current_size, 1); assert_eq!(pool.status().idle_count, 1); - let timeout_result = - tokio::time::timeout(Duration::from_millis(10), pool.get()).await; + let timeout_result = tokio::time::timeout(Duration::from_millis(10), pool.get()).await; assert!(timeout_result.is_err(), "Should have timed out"); tokio::time::sleep(Duration::from_millis(10)).await; @@ -89,7 +90,11 @@ mod bounded_tests { let obj = pool.get().await.unwrap(); assert_eq!(*obj, 1, "Should be a new object (id=1)"); - assert_eq!(created_count.load(Ordering::SeqCst), 2, "Two objects should have been created"); + assert_eq!( + created_count.load(Ordering::SeqCst), + 2, + "Two objects should have been created" + ); } /// Test ReturnToPool behavior: cancelled get() calls return objects to the pool. @@ -111,8 +116,7 @@ mod bounded_tests { assert_eq!(pool.status().current_size, 1); assert_eq!(pool.status().idle_count, 1); - let timeout_result = - tokio::time::timeout(Duration::from_millis(10), pool.get()).await; + let timeout_result = tokio::time::timeout(Duration::from_millis(10), pool.get()).await; assert!(timeout_result.is_err(), "Should have timed out"); tokio::time::sleep(Duration::from_millis(10)).await; @@ -167,7 +171,8 @@ mod bounded_tests { ); } - /// Test that failed is_recyclable still properly detaches objects (regardless of cancellation behavior). + /// Test that failed is_recyclable still properly detaches objects (regardless of cancellation + /// behavior). #[tokio::test] async fn test_failed_recyclable_still_detaches() { const MAX_SIZE: usize = 1; @@ -185,16 +190,22 @@ mod bounded_tests { let obj = pool.get().await.unwrap(); assert_eq!(*obj, 0, "Should get the recycled object"); - assert_eq!(obj.status().recycle_count(), 1, "Should have been recycled once"); + assert_eq!( + obj.status().recycle_count(), + 1, + "Should have been recycled once" + ); } } mod unbounded_tests { - use super::*; use fastpool::unbounded::Pool; use fastpool::unbounded::PoolConfig; - /// Test default behavior (Detach): cancelled get() calls detach objects from the unbounded pool. + use super::*; + + /// Test default behavior (Detach): cancelled get() calls detach objects from the unbounded + /// pool. #[tokio::test] async fn test_default_detach_behavior() { let created_count = Arc::new(AtomicUsize::new(0)); @@ -209,8 +220,7 @@ mod unbounded_tests { assert_eq!(pool.status().current_size, 1); assert_eq!(pool.status().idle_count, 1); - let timeout_result = - tokio::time::timeout(Duration::from_millis(10), pool.get()).await; + let timeout_result = tokio::time::timeout(Duration::from_millis(10), pool.get()).await; assert!(timeout_result.is_err(), "Should have timed out"); tokio::time::sleep(Duration::from_millis(10)).await; @@ -223,7 +233,11 @@ mod unbounded_tests { let obj = pool.get().await.unwrap(); assert_eq!(*obj, 1, "Should be a new object (id=1)"); - assert_eq!(created_count.load(Ordering::SeqCst), 2, "Two objects should have been created"); + assert_eq!( + created_count.load(Ordering::SeqCst), + 2, + "Two objects should have been created" + ); } /// Test ReturnToPool behavior: cancelled get() calls return objects to the unbounded pool. @@ -231,8 +245,8 @@ mod unbounded_tests { async fn test_return_to_pool_behavior() { let created_count = Arc::new(AtomicUsize::new(0)); let manager = SlowRecycleManager::new(created_count.clone(), Duration::from_millis(100)); - let config = PoolConfig::new() - .with_cancellation_behavior(CancellationBehavior::ReturnToPool); + let config = + PoolConfig::new().with_cancellation_behavior(CancellationBehavior::ReturnToPool); let pool = Pool::new(config, manager); let obj = pool.get().await.unwrap(); @@ -244,8 +258,7 @@ mod unbounded_tests { assert_eq!(pool.status().current_size, 1); assert_eq!(pool.status().idle_count, 1); - let timeout_result = - tokio::time::timeout(Duration::from_millis(10), pool.get()).await; + let timeout_result = tokio::time::timeout(Duration::from_millis(10), pool.get()).await; assert!(timeout_result.is_err(), "Should have timed out"); tokio::time::sleep(Duration::from_millis(10)).await; @@ -270,8 +283,8 @@ mod unbounded_tests { async fn test_multiple_cancelled_gets_with_return_to_pool() { let created_count = Arc::new(AtomicUsize::new(0)); let manager = SlowRecycleManager::new(created_count.clone(), Duration::from_millis(100)); - let config = PoolConfig::new() - .with_cancellation_behavior(CancellationBehavior::ReturnToPool); + let config = + PoolConfig::new().with_cancellation_behavior(CancellationBehavior::ReturnToPool); let pool = Pool::new(config, manager); let obj1 = pool.get().await.unwrap(); diff --git a/xtask/src/main.rs b/xtask/src/main.rs index 09b259e..0710f5c 100644 --- a/xtask/src/main.rs +++ b/xtask/src/main.rs @@ -63,7 +63,7 @@ struct CommandTest { impl CommandTest { fn run(self) { - run_command(make_test_cmd(self.no_capture, true, &[])); + run_command(make_test_cmd(self.no_capture, &[])); } } @@ -128,12 +128,9 @@ fn make_build_cmd(locked: bool) -> StdCommand { cmd } -fn make_test_cmd(no_capture: bool, default_features: bool, features: &[&str]) -> StdCommand { +fn make_test_cmd(no_capture: bool, features: &[&str]) -> StdCommand { let mut cmd = find_command("cargo"); - cmd.args(["test", "--workspace"]); - if !default_features { - cmd.arg("--no-default-features"); - } + cmd.args(["test", "--workspace", "--no-default-features"]); if !features.is_empty() { cmd.args(["--features", features.join(",").as_str()]); } @@ -145,7 +142,7 @@ fn make_test_cmd(no_capture: bool, default_features: bool, features: &[&str]) -> fn make_format_cmd(fix: bool) -> StdCommand { let mut cmd = find_command("cargo"); - cmd.args(["fmt", "--all"]); + cmd.args(["+nightly", "fmt", "--all"]); if !fix { cmd.arg("--check"); } @@ -155,6 +152,7 @@ fn make_format_cmd(fix: bool) -> StdCommand { fn make_clippy_cmd(fix: bool) -> StdCommand { let mut cmd = find_command("cargo"); cmd.args([ + "+nightly", "clippy", "--tests", "--all-features", From 71adef34b2332c6ca4926dcc3d94ba7103a2e468 Mon Sep 17 00:00:00 2001 From: tison Date: Sat, 24 Jan 2026 13:41:04 +0800 Subject: [PATCH 3/3] rename Signed-off-by: tison --- fastpool/src/bounded.rs | 52 +++++++++---------- fastpool/src/common.rs | 17 ++++-- fastpool/src/lib.rs | 2 +- fastpool/src/unbounded.rs | 48 ++++++++--------- ...on_tests.rs => recycle_cancelled_tests.rs} | 19 ++++--- 5 files changed, 71 insertions(+), 67 deletions(-) rename fastpool/tests/{cancellation_tests.rs => recycle_cancelled_tests.rs} (94%) diff --git a/fastpool/src/bounded.rs b/fastpool/src/bounded.rs index dc76d7c..ca1b7bf 100644 --- a/fastpool/src/bounded.rs +++ b/fastpool/src/bounded.rs @@ -83,10 +83,10 @@ use std::sync::atomic::Ordering; use mea::semaphore::OwnedSemaphorePermit; use mea::semaphore::Semaphore; -use crate::CancellationBehavior; use crate::ManageObject; use crate::ObjectStatus; use crate::QueueStrategy; +use crate::RecycleCancelledStrategy; use crate::RetainResult; use crate::mutex::Mutex; use crate::retain_spec; @@ -103,8 +103,8 @@ pub struct PoolConfig { /// Determines the order of objects being queued and dequeued. pub queue_strategy: QueueStrategy, - /// Behavior when a `get()` call is cancelled - pub cancellation_behavior: CancellationBehavior, + /// Strategy when recycling object has been cancelled. + pub recycle_cancelled_strategy: RecycleCancelledStrategy, } impl PoolConfig { @@ -113,7 +113,7 @@ impl PoolConfig { Self { max_size, queue_strategy: QueueStrategy::default(), - cancellation_behavior: CancellationBehavior::default(), + recycle_cancelled_strategy: RecycleCancelledStrategy::default(), } } @@ -123,12 +123,12 @@ impl PoolConfig { self } - /// Returns a new [`PoolConfig`] with the specified cancellation behavior. - pub fn with_cancellation_behavior( + /// Returns a new [`PoolConfig`] with the specified recycle cancelled strategy. + pub fn with_recycle_cancelled_strategy( mut self, - cancellation_behavior: CancellationBehavior, + recycle_cancelled_strategy: RecycleCancelledStrategy, ) -> Self { - self.cancellation_behavior = cancellation_behavior; + self.recycle_cancelled_strategy = recycle_cancelled_strategy; self } } @@ -324,7 +324,7 @@ impl Pool { let mut unready_object = UnreadyObject { state: Some(object), pool: Arc::downgrade(self), - cancellation_behavior: self.config.cancellation_behavior, + recycle_cancelled_strategy: self.config.recycle_cancelled_strategy, }; let state = unready_object.state(); @@ -340,7 +340,7 @@ impl Pool { break unready_object.ready(permit); } else { // We need to manually detach here as the drop implementation - // depends on the cancellation behaviour + // depends on the recycle cancelled strategy. unready_object.detach(); } } @@ -419,8 +419,6 @@ impl Pool { self.users.fetch_sub(1, Ordering::Relaxed); } - /// This is used when an UnreadyObject is dropped during cancellation, - /// since the scopeguard in get() will handle decrementing users. fn return_to_pool(&self, o: ObjectState) { let mut slots = self.slots.lock(); @@ -544,23 +542,23 @@ impl Object { /// /// If the check passes, the object is converted to a ready `Object` via `ready()`. /// If the check fails, `detach()` should be called to permanently remove the object -/// from the pool. If dropped without calling either method (due to cancellation), -/// the behavior depends on the pool's [`CancellationBehavior`] configuration. +/// from the pool. If dropped without calling either method (due to being cancelled), +/// the behavior depends on the pool's [`RecycleCancelledStrategy`] configuration. struct UnreadyObject { state: Option>, pool: Weak>, - cancellation_behavior: CancellationBehavior, + recycle_cancelled_strategy: RecycleCancelledStrategy, } impl Drop for UnreadyObject { fn drop(&mut self) { if let Some(mut state) = self.state.take() { if let Some(pool) = self.pool.upgrade() { - match self.cancellation_behavior { - CancellationBehavior::Detach => { + match self.recycle_cancelled_strategy { + RecycleCancelledStrategy::Detach => { pool.detach_object(&mut state.o, false); } - CancellationBehavior::ReturnToPool => { + RecycleCancelledStrategy::ReturnToPool => { pool.return_to_pool(state); } } @@ -569,16 +567,6 @@ impl Drop for UnreadyObject { } } -impl UnreadyObject { - fn detach(&mut self) { - if let Some(mut state) = self.state.take() { - if let Some(pool) = self.pool.upgrade() { - pool.detach_object(&mut state.o, false); - } - } - } -} - impl UnreadyObject { fn ready(mut self, permit: OwnedSemaphorePermit) -> Object { // SAFETY: `state` is always `Some` when `UnreadyObject` is owned. @@ -591,6 +579,14 @@ impl UnreadyObject { } } + fn detach(&mut self) { + if let Some(mut state) = self.state.take() { + if let Some(pool) = self.pool.upgrade() { + pool.detach_object(&mut state.o, false); + } + } + } + fn state(&mut self) -> &mut ObjectState { // SAFETY: `state` is always `Some` when `UnreadyObject` is owned. self.state.as_mut().unwrap() diff --git a/fastpool/src/common.rs b/fastpool/src/common.rs index 54af7bb..b5af5c2 100644 --- a/fastpool/src/common.rs +++ b/fastpool/src/common.rs @@ -91,13 +91,24 @@ pub enum QueueStrategy { Lifo, } -/// Behavior when a `get()` call is cancelled during the `is_recyclable()` check. +/// Strategy when recycling object has been cancelled. +/// +/// This enum controls the behavior when the recycling process (specifically the +/// [`ManageObject::is_recyclable`] check) is cancelled; for example, when the +/// `get()` future is dropped. #[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] -pub enum CancellationBehavior { - /// Detach the object from the pool (default). +pub enum RecycleCancelledStrategy { + /// Detach the object from the pool. + /// + /// This is the safest option. If the recycling check is cancelled, we assume the object might + /// be in an unknown state or that the check was taking too long for a reason. The object will + /// detach from the pool. #[default] Detach, /// Return the object to the pool for potential reuse. + /// + /// This assumes that interrupting the check does not invalidate the object. The object is put + /// back into the pool. ReturnToPool, } diff --git a/fastpool/src/lib.rs b/fastpool/src/lib.rs index 7cb95e0..57e3938 100644 --- a/fastpool/src/lib.rs +++ b/fastpool/src/lib.rs @@ -195,10 +195,10 @@ //! } //! ``` -pub use common::CancellationBehavior; pub use common::ManageObject; pub use common::ObjectStatus; pub use common::QueueStrategy; +pub use common::RecycleCancelledStrategy; pub use retain_spec::RetainResult; mod common; diff --git a/fastpool/src/unbounded.rs b/fastpool/src/unbounded.rs index d510dc6..9e52b77 100644 --- a/fastpool/src/unbounded.rs +++ b/fastpool/src/unbounded.rs @@ -99,10 +99,10 @@ use std::ops::DerefMut; use std::sync::Arc; use std::sync::Weak; -use crate::CancellationBehavior; use crate::ManageObject; use crate::ObjectStatus; use crate::QueueStrategy; +use crate::RecycleCancelledStrategy; use crate::RetainResult; use crate::mutex::Mutex; use crate::retain_spec; @@ -116,8 +116,8 @@ pub struct PoolConfig { /// Determines the order of objects being queued and dequeued. pub queue_strategy: QueueStrategy, - /// Behavior when a `get()` call is cancelled - pub cancellation_behavior: CancellationBehavior, + /// Strategy when recycling object has been cancelled. + pub recycle_cancelled_strategy: RecycleCancelledStrategy, } impl Default for PoolConfig { @@ -131,7 +131,7 @@ impl PoolConfig { pub fn new() -> Self { Self { queue_strategy: QueueStrategy::default(), - cancellation_behavior: CancellationBehavior::default(), + recycle_cancelled_strategy: RecycleCancelledStrategy::default(), } } @@ -141,12 +141,12 @@ impl PoolConfig { self } - /// Returns a new [`PoolConfig`] with the specified cancellation behavior. - pub fn with_cancellation_behavior( + /// Returns a new [`PoolConfig`] with the specified recycle cancelled strategy. + pub fn with_recycle_cancelled_strategy( mut self, - cancellation_behavior: CancellationBehavior, + recycle_cancelled_strategy: RecycleCancelledStrategy, ) -> Self { - self.cancellation_behavior = cancellation_behavior; + self.recycle_cancelled_strategy = recycle_cancelled_strategy; self } } @@ -338,7 +338,7 @@ impl> Pool { let mut unready_object = UnreadyObject { state: Some(object), pool: Arc::downgrade(self), - cancellation_behavior: self.config.cancellation_behavior, + recycle_cancelled_strategy: self.config.recycle_cancelled_strategy, }; let state = unready_object.state(); @@ -354,7 +354,7 @@ impl> Pool { break unready_object.ready(); } else { // We need to manually detach here as the drop implementation - // depends on the cancellation behaviour + // depends on the recycle cancelled strategy. unready_object.detach(); } } @@ -561,23 +561,23 @@ impl> Object { /// /// If the check passes, the object is converted to a ready `Object` via `ready()`. /// If the check fails, `detach()` should be called to permanently remove the object -/// from the pool. If dropped without calling either method (due to cancellation), -/// the behavior depends on the pool's [`CancellationBehavior`] configuration. +/// from the pool. If dropped without calling either method (due to being cancelled), +/// the behavior depends on the pool's [`RecycleCancelledStrategy`] configuration. struct UnreadyObject = NeverManageObject> { state: Option>, pool: Weak>, - cancellation_behavior: CancellationBehavior, + recycle_cancelled_strategy: RecycleCancelledStrategy, } impl> Drop for UnreadyObject { fn drop(&mut self) { if let Some(mut state) = self.state.take() { if let Some(pool) = self.pool.upgrade() { - match self.cancellation_behavior { - CancellationBehavior::Detach => { + match self.recycle_cancelled_strategy { + RecycleCancelledStrategy::Detach => { pool.detach_object(&mut state.o); } - CancellationBehavior::ReturnToPool => { + RecycleCancelledStrategy::ReturnToPool => { pool.push_back(state); } } @@ -587,6 +587,13 @@ impl> Drop for UnreadyObject { } impl> UnreadyObject { + fn ready(mut self) -> Object { + // SAFETY: `state` is always `Some` when `UnreadyObject` is owned. + let state = Some(self.state.take().unwrap()); + let pool = self.pool.clone(); + Object { state, pool } + } + fn detach(&mut self) { if let Some(mut state) = self.state.take() { if let Some(pool) = self.pool.upgrade() { @@ -594,15 +601,6 @@ impl> UnreadyObject { } } } -} - -impl> UnreadyObject { - fn ready(mut self) -> Object { - // SAFETY: `state` is always `Some` when `UnreadyObject` is owned. - let state = Some(self.state.take().unwrap()); - let pool = self.pool.clone(); - Object { state, pool } - } fn state(&mut self) -> &mut ObjectState { // SAFETY: `state` is always `Some` when `UnreadyObject` is owned. diff --git a/fastpool/tests/cancellation_tests.rs b/fastpool/tests/recycle_cancelled_tests.rs similarity index 94% rename from fastpool/tests/cancellation_tests.rs rename to fastpool/tests/recycle_cancelled_tests.rs index bc7a08b..cc02ad3 100644 --- a/fastpool/tests/cancellation_tests.rs +++ b/fastpool/tests/recycle_cancelled_tests.rs @@ -18,9 +18,9 @@ use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::time::Duration; -use fastpool::CancellationBehavior; use fastpool::ManageObject; use fastpool::ObjectStatus; +use fastpool::RecycleCancelledStrategy; struct SlowRecycleManager { created_count: Arc, @@ -104,7 +104,7 @@ mod bounded_tests { let created_count = Arc::new(AtomicUsize::new(0)); let manager = SlowRecycleManager::new(created_count.clone(), Duration::from_millis(100)); let config = PoolConfig::new(MAX_SIZE) - .with_cancellation_behavior(CancellationBehavior::ReturnToPool); + .with_recycle_cancelled_strategy(RecycleCancelledStrategy::ReturnToPool); let pool = Pool::new(config, manager); let obj = pool.get().await.unwrap(); @@ -148,7 +148,7 @@ mod bounded_tests { let created_count = Arc::new(AtomicUsize::new(0)); let manager = SlowRecycleManager::new(created_count.clone(), Duration::from_millis(100)); let config = PoolConfig::new(MAX_SIZE) - .with_cancellation_behavior(CancellationBehavior::ReturnToPool); + .with_recycle_cancelled_strategy(RecycleCancelledStrategy::ReturnToPool); let pool = Pool::new(config, manager); let obj1 = pool.get().await.unwrap(); @@ -171,15 +171,14 @@ mod bounded_tests { ); } - /// Test that failed is_recyclable still properly detaches objects (regardless of cancellation - /// behavior). + /// Test that failed is_recyclable always properly detaches objects. #[tokio::test] async fn test_failed_recyclable_still_detaches() { const MAX_SIZE: usize = 1; let created_count = Arc::new(AtomicUsize::new(0)); let manager = SlowRecycleManager::new(created_count.clone(), Duration::from_millis(10)); let config = PoolConfig::new(MAX_SIZE) - .with_cancellation_behavior(CancellationBehavior::ReturnToPool); + .with_recycle_cancelled_strategy(RecycleCancelledStrategy::ReturnToPool); let pool = Pool::new(config, manager); let obj = pool.get().await.unwrap(); @@ -245,8 +244,8 @@ mod unbounded_tests { async fn test_return_to_pool_behavior() { let created_count = Arc::new(AtomicUsize::new(0)); let manager = SlowRecycleManager::new(created_count.clone(), Duration::from_millis(100)); - let config = - PoolConfig::new().with_cancellation_behavior(CancellationBehavior::ReturnToPool); + let config = PoolConfig::new() + .with_recycle_cancelled_strategy(RecycleCancelledStrategy::ReturnToPool); let pool = Pool::new(config, manager); let obj = pool.get().await.unwrap(); @@ -283,8 +282,8 @@ mod unbounded_tests { async fn test_multiple_cancelled_gets_with_return_to_pool() { let created_count = Arc::new(AtomicUsize::new(0)); let manager = SlowRecycleManager::new(created_count.clone(), Duration::from_millis(100)); - let config = - PoolConfig::new().with_cancellation_behavior(CancellationBehavior::ReturnToPool); + let config = PoolConfig::new() + .with_recycle_cancelled_strategy(RecycleCancelledStrategy::ReturnToPool); let pool = Pool::new(config, manager); let obj1 = pool.get().await.unwrap();