Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 47 additions & 5 deletions fastpool/src/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ use mea::semaphore::Semaphore;
use crate::ManageObject;
use crate::ObjectStatus;
use crate::QueueStrategy;
use crate::RecycleCancelledStrategy;
use crate::RetainResult;
use crate::mutex::Mutex;
use crate::retain_spec;
Expand All @@ -101,6 +102,9 @@ pub struct PoolConfig {
///
/// Determines the order of objects being queued and dequeued.
pub queue_strategy: QueueStrategy,

/// Strategy when recycling object has been cancelled.
pub recycle_cancelled_strategy: RecycleCancelledStrategy,
}

impl PoolConfig {
Expand All @@ -109,6 +113,7 @@ impl PoolConfig {
Self {
max_size,
queue_strategy: QueueStrategy::default(),
recycle_cancelled_strategy: RecycleCancelledStrategy::default(),
}
}

Expand All @@ -117,6 +122,15 @@ impl PoolConfig {
self.queue_strategy = queue_strategy;
self
}

/// Returns a new [`PoolConfig`] with the specified recycle cancelled strategy.
pub fn with_recycle_cancelled_strategy(
mut self,
recycle_cancelled_strategy: RecycleCancelledStrategy,
) -> Self {
self.recycle_cancelled_strategy = recycle_cancelled_strategy;
self
}
}

/// The current pool status.
Expand Down Expand Up @@ -310,6 +324,7 @@ impl<M: ManageObject> Pool<M> {
let mut unready_object = UnreadyObject {
state: Some(object),
pool: Arc::downgrade(self),
recycle_cancelled_strategy: self.config.recycle_cancelled_strategy,
};

let state = unready_object.state();
Expand All @@ -323,6 +338,10 @@ impl<M: ManageObject> Pool<M> {
state.status.recycle_count += 1;
state.status.recycled = Some(std::time::Instant::now());
break unready_object.ready(permit);
} else {
// We need to manually detach here as the drop implementation
// depends on the recycle cancelled strategy.
unready_object.detach();
}
}
};
Expand Down Expand Up @@ -396,6 +415,11 @@ impl<M: ManageObject> Pool<M> {
}

fn push_back(&self, o: ObjectState<M::Object>) {
self.return_to_pool(o);
self.users.fetch_sub(1, Ordering::Relaxed);
}

fn return_to_pool(&self, o: ObjectState<M::Object>) {
let mut slots = self.slots.lock();

assert!(
Expand All @@ -406,9 +430,6 @@ impl<M: ManageObject> Pool<M> {
);

slots.deque.push_back(o);
drop(slots);

self.users.fetch_sub(1, Ordering::Relaxed);
}

fn detach_object(&self, o: &mut M::Object, ready: bool) {
Expand Down Expand Up @@ -517,17 +538,30 @@ impl<M: ManageObject> Object<M> {
}
}

/// A wrapper of ObjectStatus that detaches the object from the pool when dropped.
/// A wrapper of ObjectState used during the `is_recyclable` check in `Pool::get`.
///
/// If the check passes, the object is converted to a ready `Object` via `ready()`.
/// If the check fails, `detach()` should be called to permanently remove the object
/// from the pool. If dropped without calling either method (due to being cancelled),
/// the behavior depends on the pool's [`RecycleCancelledStrategy`] configuration.
struct UnreadyObject<M: ManageObject> {
state: Option<ObjectState<M::Object>>,
pool: Weak<Pool<M>>,
recycle_cancelled_strategy: RecycleCancelledStrategy,
}

impl<M: ManageObject> Drop for UnreadyObject<M> {
fn drop(&mut self) {
if let Some(mut state) = self.state.take() {
if let Some(pool) = self.pool.upgrade() {
pool.detach_object(&mut state.o, false);
match self.recycle_cancelled_strategy {
RecycleCancelledStrategy::Detach => {
pool.detach_object(&mut state.o, false);
}
RecycleCancelledStrategy::ReturnToPool => {
pool.return_to_pool(state);
}
}
}
}
}
Expand All @@ -545,6 +579,14 @@ impl<M: ManageObject> UnreadyObject<M> {
}
}

fn detach(&mut self) {
if let Some(mut state) = self.state.take() {
if let Some(pool) = self.pool.upgrade() {
pool.detach_object(&mut state.o, false);
}
}
}

fn state(&mut self) -> &mut ObjectState<M::Object> {
// SAFETY: `state` is always `Some` when `UnreadyObject` is owned.
self.state.as_mut().unwrap()
Expand Down
22 changes: 22 additions & 0 deletions fastpool/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,25 @@ pub enum QueueStrategy {
/// This strategy behaves like a stack.
Lifo,
}

/// Strategy when recycling object has been cancelled.
///
/// This enum controls the behavior when the recycling process (specifically the
/// [`ManageObject::is_recyclable`] check) is cancelled; for example, when the
/// `get()` future is dropped.
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub enum RecycleCancelledStrategy {
/// Detach the object from the pool.
///
/// This is the safest option. If the recycling check is cancelled, we assume the object might
/// be in an unknown state or that the check was taking too long for a reason. The object will
/// detach from the pool.
#[default]
Detach,

/// Return the object to the pool for potential reuse.
///
/// This assumes that interrupting the check does not invalidate the object. The object is put
/// back into the pool.
ReturnToPool,
}
1 change: 1 addition & 0 deletions fastpool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@
pub use common::ManageObject;
pub use common::ObjectStatus;
pub use common::QueueStrategy;
pub use common::RecycleCancelledStrategy;
pub use retain_spec::RetainResult;

mod common;
Expand Down
44 changes: 42 additions & 2 deletions fastpool/src/unbounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ use std::sync::Weak;
use crate::ManageObject;
use crate::ObjectStatus;
use crate::QueueStrategy;
use crate::RecycleCancelledStrategy;
use crate::RetainResult;
use crate::mutex::Mutex;
use crate::retain_spec;
Expand All @@ -114,6 +115,9 @@ pub struct PoolConfig {
///
/// Determines the order of objects being queued and dequeued.
pub queue_strategy: QueueStrategy,

/// Strategy when recycling object has been cancelled.
pub recycle_cancelled_strategy: RecycleCancelledStrategy,
}

impl Default for PoolConfig {
Expand All @@ -127,6 +131,7 @@ impl PoolConfig {
pub fn new() -> Self {
Self {
queue_strategy: QueueStrategy::default(),
recycle_cancelled_strategy: RecycleCancelledStrategy::default(),
}
}

Expand All @@ -135,6 +140,15 @@ impl PoolConfig {
self.queue_strategy = queue_strategy;
self
}

/// Returns a new [`PoolConfig`] with the specified recycle cancelled strategy.
pub fn with_recycle_cancelled_strategy(
mut self,
recycle_cancelled_strategy: RecycleCancelledStrategy,
) -> Self {
self.recycle_cancelled_strategy = recycle_cancelled_strategy;
self
}
}

/// The current pool status.
Expand Down Expand Up @@ -324,6 +338,7 @@ impl<T, M: ManageObject<Object = T>> Pool<T, M> {
let mut unready_object = UnreadyObject {
state: Some(object),
pool: Arc::downgrade(self),
recycle_cancelled_strategy: self.config.recycle_cancelled_strategy,
};

let state = unready_object.state();
Expand All @@ -337,6 +352,10 @@ impl<T, M: ManageObject<Object = T>> Pool<T, M> {
state.status.recycle_count += 1;
state.status.recycled = Some(std::time::Instant::now());
break unready_object.ready();
} else {
// We need to manually detach here as the drop implementation
// depends on the recycle cancelled strategy.
unready_object.detach();
}
}
};
Expand Down Expand Up @@ -538,17 +557,30 @@ impl<T, M: ManageObject<Object = T>> Object<T, M> {
}
}

/// A wrapper of ObjectStatus that detaches the object from the pool when dropped.
/// A wrapper of ObjectState used during the `is_recyclable` check in `Pool::get`.
///
/// If the check passes, the object is converted to a ready `Object` via `ready()`.
/// If the check fails, `detach()` should be called to permanently remove the object
/// from the pool. If dropped without calling either method (due to being cancelled),
/// the behavior depends on the pool's [`RecycleCancelledStrategy`] configuration.
struct UnreadyObject<T, M: ManageObject<Object = T> = NeverManageObject<T>> {
state: Option<ObjectState<T>>,
pool: Weak<Pool<T, M>>,
recycle_cancelled_strategy: RecycleCancelledStrategy,
}

impl<T, M: ManageObject<Object = T>> Drop for UnreadyObject<T, M> {
fn drop(&mut self) {
if let Some(mut state) = self.state.take() {
if let Some(pool) = self.pool.upgrade() {
pool.detach_object(&mut state.o);
match self.recycle_cancelled_strategy {
RecycleCancelledStrategy::Detach => {
pool.detach_object(&mut state.o);
}
RecycleCancelledStrategy::ReturnToPool => {
pool.push_back(state);
}
}
}
}
}
Expand All @@ -562,6 +594,14 @@ impl<T, M: ManageObject<Object = T>> UnreadyObject<T, M> {
Object { state, pool }
}

fn detach(&mut self) {
if let Some(mut state) = self.state.take() {
if let Some(pool) = self.pool.upgrade() {
pool.detach_object(&mut state.o);
}
}
}

fn state(&mut self) -> &mut ObjectState<T> {
// SAFETY: `state` is always `Some` when `UnreadyObject` is owned.
self.state.as_mut().unwrap()
Expand Down
Loading