diff --git a/fastpool/src/bounded.rs b/fastpool/src/bounded.rs index 3b08c7b..ca1b7bf 100644 --- a/fastpool/src/bounded.rs +++ b/fastpool/src/bounded.rs @@ -86,6 +86,7 @@ use mea::semaphore::Semaphore; use crate::ManageObject; use crate::ObjectStatus; use crate::QueueStrategy; +use crate::RecycleCancelledStrategy; use crate::RetainResult; use crate::mutex::Mutex; use crate::retain_spec; @@ -101,6 +102,9 @@ pub struct PoolConfig { /// /// Determines the order of objects being queued and dequeued. pub queue_strategy: QueueStrategy, + + /// Strategy when recycling object has been cancelled. + pub recycle_cancelled_strategy: RecycleCancelledStrategy, } impl PoolConfig { @@ -109,6 +113,7 @@ impl PoolConfig { Self { max_size, queue_strategy: QueueStrategy::default(), + recycle_cancelled_strategy: RecycleCancelledStrategy::default(), } } @@ -117,6 +122,15 @@ impl PoolConfig { self.queue_strategy = queue_strategy; self } + + /// Returns a new [`PoolConfig`] with the specified recycle cancelled strategy. + pub fn with_recycle_cancelled_strategy( + mut self, + recycle_cancelled_strategy: RecycleCancelledStrategy, + ) -> Self { + self.recycle_cancelled_strategy = recycle_cancelled_strategy; + self + } } /// The current pool status. @@ -310,6 +324,7 @@ impl Pool { let mut unready_object = UnreadyObject { state: Some(object), pool: Arc::downgrade(self), + recycle_cancelled_strategy: self.config.recycle_cancelled_strategy, }; let state = unready_object.state(); @@ -323,6 +338,10 @@ impl Pool { state.status.recycle_count += 1; state.status.recycled = Some(std::time::Instant::now()); break unready_object.ready(permit); + } else { + // We need to manually detach here as the drop implementation + // depends on the recycle cancelled strategy. + unready_object.detach(); } } }; @@ -396,6 +415,11 @@ impl Pool { } fn push_back(&self, o: ObjectState) { + self.return_to_pool(o); + self.users.fetch_sub(1, Ordering::Relaxed); + } + + fn return_to_pool(&self, o: ObjectState) { let mut slots = self.slots.lock(); assert!( @@ -406,9 +430,6 @@ impl Pool { ); slots.deque.push_back(o); - drop(slots); - - self.users.fetch_sub(1, Ordering::Relaxed); } fn detach_object(&self, o: &mut M::Object, ready: bool) { @@ -517,17 +538,30 @@ impl Object { } } -/// A wrapper of ObjectStatus that detaches the object from the pool when dropped. +/// A wrapper of ObjectState used during the `is_recyclable` check in `Pool::get`. +/// +/// If the check passes, the object is converted to a ready `Object` via `ready()`. +/// If the check fails, `detach()` should be called to permanently remove the object +/// from the pool. If dropped without calling either method (due to being cancelled), +/// the behavior depends on the pool's [`RecycleCancelledStrategy`] configuration. struct UnreadyObject { state: Option>, pool: Weak>, + recycle_cancelled_strategy: RecycleCancelledStrategy, } impl Drop for UnreadyObject { fn drop(&mut self) { if let Some(mut state) = self.state.take() { if let Some(pool) = self.pool.upgrade() { - pool.detach_object(&mut state.o, false); + match self.recycle_cancelled_strategy { + RecycleCancelledStrategy::Detach => { + pool.detach_object(&mut state.o, false); + } + RecycleCancelledStrategy::ReturnToPool => { + pool.return_to_pool(state); + } + } } } } @@ -545,6 +579,14 @@ impl UnreadyObject { } } + fn detach(&mut self) { + if let Some(mut state) = self.state.take() { + if let Some(pool) = self.pool.upgrade() { + pool.detach_object(&mut state.o, false); + } + } + } + fn state(&mut self) -> &mut ObjectState { // SAFETY: `state` is always `Some` when `UnreadyObject` is owned. self.state.as_mut().unwrap() diff --git a/fastpool/src/common.rs b/fastpool/src/common.rs index f5a97eb..b5af5c2 100644 --- a/fastpool/src/common.rs +++ b/fastpool/src/common.rs @@ -90,3 +90,25 @@ pub enum QueueStrategy { /// This strategy behaves like a stack. Lifo, } + +/// Strategy when recycling object has been cancelled. +/// +/// This enum controls the behavior when the recycling process (specifically the +/// [`ManageObject::is_recyclable`] check) is cancelled; for example, when the +/// `get()` future is dropped. +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] +pub enum RecycleCancelledStrategy { + /// Detach the object from the pool. + /// + /// This is the safest option. If the recycling check is cancelled, we assume the object might + /// be in an unknown state or that the check was taking too long for a reason. The object will + /// detach from the pool. + #[default] + Detach, + + /// Return the object to the pool for potential reuse. + /// + /// This assumes that interrupting the check does not invalidate the object. The object is put + /// back into the pool. + ReturnToPool, +} diff --git a/fastpool/src/lib.rs b/fastpool/src/lib.rs index 57e74b3..57e3938 100644 --- a/fastpool/src/lib.rs +++ b/fastpool/src/lib.rs @@ -198,6 +198,7 @@ pub use common::ManageObject; pub use common::ObjectStatus; pub use common::QueueStrategy; +pub use common::RecycleCancelledStrategy; pub use retain_spec::RetainResult; mod common; diff --git a/fastpool/src/unbounded.rs b/fastpool/src/unbounded.rs index a9d03c9..9e52b77 100644 --- a/fastpool/src/unbounded.rs +++ b/fastpool/src/unbounded.rs @@ -102,6 +102,7 @@ use std::sync::Weak; use crate::ManageObject; use crate::ObjectStatus; use crate::QueueStrategy; +use crate::RecycleCancelledStrategy; use crate::RetainResult; use crate::mutex::Mutex; use crate::retain_spec; @@ -114,6 +115,9 @@ pub struct PoolConfig { /// /// Determines the order of objects being queued and dequeued. pub queue_strategy: QueueStrategy, + + /// Strategy when recycling object has been cancelled. + pub recycle_cancelled_strategy: RecycleCancelledStrategy, } impl Default for PoolConfig { @@ -127,6 +131,7 @@ impl PoolConfig { pub fn new() -> Self { Self { queue_strategy: QueueStrategy::default(), + recycle_cancelled_strategy: RecycleCancelledStrategy::default(), } } @@ -135,6 +140,15 @@ impl PoolConfig { self.queue_strategy = queue_strategy; self } + + /// Returns a new [`PoolConfig`] with the specified recycle cancelled strategy. + pub fn with_recycle_cancelled_strategy( + mut self, + recycle_cancelled_strategy: RecycleCancelledStrategy, + ) -> Self { + self.recycle_cancelled_strategy = recycle_cancelled_strategy; + self + } } /// The current pool status. @@ -324,6 +338,7 @@ impl> Pool { let mut unready_object = UnreadyObject { state: Some(object), pool: Arc::downgrade(self), + recycle_cancelled_strategy: self.config.recycle_cancelled_strategy, }; let state = unready_object.state(); @@ -337,6 +352,10 @@ impl> Pool { state.status.recycle_count += 1; state.status.recycled = Some(std::time::Instant::now()); break unready_object.ready(); + } else { + // We need to manually detach here as the drop implementation + // depends on the recycle cancelled strategy. + unready_object.detach(); } } }; @@ -538,17 +557,30 @@ impl> Object { } } -/// A wrapper of ObjectStatus that detaches the object from the pool when dropped. +/// A wrapper of ObjectState used during the `is_recyclable` check in `Pool::get`. +/// +/// If the check passes, the object is converted to a ready `Object` via `ready()`. +/// If the check fails, `detach()` should be called to permanently remove the object +/// from the pool. If dropped without calling either method (due to being cancelled), +/// the behavior depends on the pool's [`RecycleCancelledStrategy`] configuration. struct UnreadyObject = NeverManageObject> { state: Option>, pool: Weak>, + recycle_cancelled_strategy: RecycleCancelledStrategy, } impl> Drop for UnreadyObject { fn drop(&mut self) { if let Some(mut state) = self.state.take() { if let Some(pool) = self.pool.upgrade() { - pool.detach_object(&mut state.o); + match self.recycle_cancelled_strategy { + RecycleCancelledStrategy::Detach => { + pool.detach_object(&mut state.o); + } + RecycleCancelledStrategy::ReturnToPool => { + pool.push_back(state); + } + } } } } @@ -562,6 +594,14 @@ impl> UnreadyObject { Object { state, pool } } + fn detach(&mut self) { + if let Some(mut state) = self.state.take() { + if let Some(pool) = self.pool.upgrade() { + pool.detach_object(&mut state.o); + } + } + } + fn state(&mut self) -> &mut ObjectState { // SAFETY: `state` is always `Some` when `UnreadyObject` is owned. self.state.as_mut().unwrap() diff --git a/fastpool/tests/recycle_cancelled_tests.rs b/fastpool/tests/recycle_cancelled_tests.rs new file mode 100644 index 0000000..cc02ad3 --- /dev/null +++ b/fastpool/tests/recycle_cancelled_tests.rs @@ -0,0 +1,308 @@ +// Copyright 2025 FastLabs Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::convert::Infallible; +use std::sync::Arc; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use std::time::Duration; + +use fastpool::ManageObject; +use fastpool::ObjectStatus; +use fastpool::RecycleCancelledStrategy; + +struct SlowRecycleManager { + created_count: Arc, + recycle_delay: Duration, +} + +impl SlowRecycleManager { + fn new(created_count: Arc, recycle_delay: Duration) -> Self { + Self { + created_count, + recycle_delay, + } + } +} + +impl ManageObject for SlowRecycleManager { + type Object = usize; + type Error = Infallible; + + async fn create(&self) -> Result { + let id = self.created_count.fetch_add(1, Ordering::SeqCst); + Ok(id) + } + + async fn is_recyclable( + &self, + _o: &mut Self::Object, + _status: &ObjectStatus, + ) -> Result<(), Self::Error> { + tokio::time::sleep(self.recycle_delay).await; + Ok(()) + } +} + +mod bounded_tests { + use fastpool::bounded::Pool; + use fastpool::bounded::PoolConfig; + + use super::*; + + /// Test default behavior (Detach): cancelled get() calls detach objects from the pool. + #[tokio::test] + async fn test_default_detach_behavior() { + const MAX_SIZE: usize = 1; + let created_count = Arc::new(AtomicUsize::new(0)); + let manager = SlowRecycleManager::new(created_count.clone(), Duration::from_millis(100)); + let pool = Pool::new(PoolConfig::new(MAX_SIZE), manager); + + let obj = pool.get().await.unwrap(); + assert_eq!(*obj, 0); + assert_eq!(pool.status().current_size, 1); + + drop(obj); + assert_eq!(pool.status().current_size, 1); + assert_eq!(pool.status().idle_count, 1); + + let timeout_result = tokio::time::timeout(Duration::from_millis(10), pool.get()).await; + assert!(timeout_result.is_err(), "Should have timed out"); + + tokio::time::sleep(Duration::from_millis(10)).await; + + let status = pool.status(); + assert_eq!( + status.current_size, 0, + "Pool size should be 0 after cancelled get() with Detach behavior" + ); + + let obj = pool.get().await.unwrap(); + assert_eq!(*obj, 1, "Should be a new object (id=1)"); + assert_eq!( + created_count.load(Ordering::SeqCst), + 2, + "Two objects should have been created" + ); + } + + /// Test ReturnToPool behavior: cancelled get() calls return objects to the pool. + #[tokio::test] + async fn test_return_to_pool_behavior() { + const MAX_SIZE: usize = 1; + let created_count = Arc::new(AtomicUsize::new(0)); + let manager = SlowRecycleManager::new(created_count.clone(), Duration::from_millis(100)); + let config = PoolConfig::new(MAX_SIZE) + .with_recycle_cancelled_strategy(RecycleCancelledStrategy::ReturnToPool); + let pool = Pool::new(config, manager); + + let obj = pool.get().await.unwrap(); + assert_eq!(*obj, 0); + assert_eq!(pool.status().current_size, 1); + assert_eq!(created_count.load(Ordering::SeqCst), 1); + + drop(obj); + assert_eq!(pool.status().current_size, 1); + assert_eq!(pool.status().idle_count, 1); + + let timeout_result = tokio::time::timeout(Duration::from_millis(10), pool.get()).await; + assert!(timeout_result.is_err(), "Should have timed out"); + + tokio::time::sleep(Duration::from_millis(10)).await; + + let status = pool.status(); + assert_eq!( + status.current_size, 1, + "Pool size should be preserved after cancelled get() with ReturnToPool behavior" + ); + assert_eq!( + status.idle_count, 1, + "Object should be back in idle state after cancelled get()" + ); + + assert_eq!( + created_count.load(Ordering::SeqCst), + 1, + "No extra objects should be created" + ); + + let obj = pool.get().await.unwrap(); + assert_eq!(*obj, 0, "Should get the same object back"); + } + + /// Test that multiple cancelled get() calls with ReturnToPool don't shrink the pool. + #[tokio::test] + async fn test_multiple_cancelled_gets_with_return_to_pool() { + const MAX_SIZE: usize = 3; + let created_count = Arc::new(AtomicUsize::new(0)); + let manager = SlowRecycleManager::new(created_count.clone(), Duration::from_millis(100)); + let config = PoolConfig::new(MAX_SIZE) + .with_recycle_cancelled_strategy(RecycleCancelledStrategy::ReturnToPool); + let pool = Pool::new(config, manager); + + let obj1 = pool.get().await.unwrap(); + let obj2 = pool.get().await.unwrap(); + let obj3 = pool.get().await.unwrap(); + + drop((obj1, obj2, obj3)); + assert_eq!(pool.status().current_size, 3); + assert_eq!(pool.status().idle_count, 3); + + for _ in 0..5 { + let _ = tokio::time::timeout(Duration::from_millis(10), pool.get()).await; + tokio::time::sleep(Duration::from_millis(5)).await; + } + + let status = pool.status(); + assert_eq!( + status.current_size, 3, + "Pool size should be preserved after multiple cancelled gets" + ); + } + + /// Test that failed is_recyclable always properly detaches objects. + #[tokio::test] + async fn test_failed_recyclable_still_detaches() { + const MAX_SIZE: usize = 1; + let created_count = Arc::new(AtomicUsize::new(0)); + let manager = SlowRecycleManager::new(created_count.clone(), Duration::from_millis(10)); + let config = PoolConfig::new(MAX_SIZE) + .with_recycle_cancelled_strategy(RecycleCancelledStrategy::ReturnToPool); + let pool = Pool::new(config, manager); + + let obj = pool.get().await.unwrap(); + assert_eq!(*obj, 0); + drop(obj); + assert_eq!(pool.status().current_size, 1); + assert_eq!(pool.status().idle_count, 1); + + let obj = pool.get().await.unwrap(); + assert_eq!(*obj, 0, "Should get the recycled object"); + assert_eq!( + obj.status().recycle_count(), + 1, + "Should have been recycled once" + ); + } +} + +mod unbounded_tests { + use fastpool::unbounded::Pool; + use fastpool::unbounded::PoolConfig; + + use super::*; + + /// Test default behavior (Detach): cancelled get() calls detach objects from the unbounded + /// pool. + #[tokio::test] + async fn test_default_detach_behavior() { + let created_count = Arc::new(AtomicUsize::new(0)); + let manager = SlowRecycleManager::new(created_count.clone(), Duration::from_millis(100)); + let pool = Pool::new(PoolConfig::default(), manager); + + let obj = pool.get().await.unwrap(); + assert_eq!(*obj, 0); + assert_eq!(pool.status().current_size, 1); + + drop(obj); + assert_eq!(pool.status().current_size, 1); + assert_eq!(pool.status().idle_count, 1); + + let timeout_result = tokio::time::timeout(Duration::from_millis(10), pool.get()).await; + assert!(timeout_result.is_err(), "Should have timed out"); + + tokio::time::sleep(Duration::from_millis(10)).await; + + let status = pool.status(); + assert_eq!( + status.current_size, 0, + "Pool size should be 0 after cancelled get() with Detach behavior" + ); + + let obj = pool.get().await.unwrap(); + assert_eq!(*obj, 1, "Should be a new object (id=1)"); + assert_eq!( + created_count.load(Ordering::SeqCst), + 2, + "Two objects should have been created" + ); + } + + /// Test ReturnToPool behavior: cancelled get() calls return objects to the unbounded pool. + #[tokio::test] + async fn test_return_to_pool_behavior() { + let created_count = Arc::new(AtomicUsize::new(0)); + let manager = SlowRecycleManager::new(created_count.clone(), Duration::from_millis(100)); + let config = PoolConfig::new() + .with_recycle_cancelled_strategy(RecycleCancelledStrategy::ReturnToPool); + let pool = Pool::new(config, manager); + + let obj = pool.get().await.unwrap(); + assert_eq!(*obj, 0); + assert_eq!(pool.status().current_size, 1); + assert_eq!(created_count.load(Ordering::SeqCst), 1); + + drop(obj); + assert_eq!(pool.status().current_size, 1); + assert_eq!(pool.status().idle_count, 1); + + let timeout_result = tokio::time::timeout(Duration::from_millis(10), pool.get()).await; + assert!(timeout_result.is_err(), "Should have timed out"); + + tokio::time::sleep(Duration::from_millis(10)).await; + + let status = pool.status(); + assert_eq!( + status.current_size, 1, + "Pool size should be preserved after cancelled get() with ReturnToPool behavior" + ); + assert_eq!( + status.idle_count, 1, + "Object should be back in idle state after cancelled get()" + ); + + // Verify we can still get the same object + let obj = pool.get().await.unwrap(); + assert_eq!(*obj, 0, "Should get the same object back"); + } + + /// Test that multiple cancelled get() calls with ReturnToPool don't shrink the unbounded pool. + #[tokio::test] + async fn test_multiple_cancelled_gets_with_return_to_pool() { + let created_count = Arc::new(AtomicUsize::new(0)); + let manager = SlowRecycleManager::new(created_count.clone(), Duration::from_millis(100)); + let config = PoolConfig::new() + .with_recycle_cancelled_strategy(RecycleCancelledStrategy::ReturnToPool); + let pool = Pool::new(config, manager); + + let obj1 = pool.get().await.unwrap(); + let obj2 = pool.get().await.unwrap(); + let obj3 = pool.get().await.unwrap(); + + drop((obj1, obj2, obj3)); + assert_eq!(pool.status().current_size, 3); + assert_eq!(pool.status().idle_count, 3); + + for _ in 0..5 { + let _ = tokio::time::timeout(Duration::from_millis(10), pool.get()).await; + tokio::time::sleep(Duration::from_millis(5)).await; + } + + let status = pool.status(); + assert_eq!( + status.current_size, 3, + "Pool size should be preserved after multiple cancelled gets" + ); + } +} diff --git a/xtask/src/main.rs b/xtask/src/main.rs index 09b259e..0710f5c 100644 --- a/xtask/src/main.rs +++ b/xtask/src/main.rs @@ -63,7 +63,7 @@ struct CommandTest { impl CommandTest { fn run(self) { - run_command(make_test_cmd(self.no_capture, true, &[])); + run_command(make_test_cmd(self.no_capture, &[])); } } @@ -128,12 +128,9 @@ fn make_build_cmd(locked: bool) -> StdCommand { cmd } -fn make_test_cmd(no_capture: bool, default_features: bool, features: &[&str]) -> StdCommand { +fn make_test_cmd(no_capture: bool, features: &[&str]) -> StdCommand { let mut cmd = find_command("cargo"); - cmd.args(["test", "--workspace"]); - if !default_features { - cmd.arg("--no-default-features"); - } + cmd.args(["test", "--workspace", "--no-default-features"]); if !features.is_empty() { cmd.args(["--features", features.join(",").as_str()]); } @@ -145,7 +142,7 @@ fn make_test_cmd(no_capture: bool, default_features: bool, features: &[&str]) -> fn make_format_cmd(fix: bool) -> StdCommand { let mut cmd = find_command("cargo"); - cmd.args(["fmt", "--all"]); + cmd.args(["+nightly", "fmt", "--all"]); if !fix { cmd.arg("--check"); } @@ -155,6 +152,7 @@ fn make_format_cmd(fix: bool) -> StdCommand { fn make_clippy_cmd(fix: bool) -> StdCommand { let mut cmd = find_command("cargo"); cmd.args([ + "+nightly", "clippy", "--tests", "--all-features",