use anyhow::Context;
use std::{
collections::VecDeque,
ops::{Deref, DerefMut},
sync::{Condvar, Mutex},
time::Duration,
};
use thiserror::Error;
use crate::StdResult;
#[derive(Error, Debug)]
pub enum ResourcePoolError {
#[error("Poisoned mutex caused error during acquire lock on resource pool")]
PoisonedLock,
#[error("Acquire resource has timed out")]
AcquireTimeout,
}
pub struct ResourcePool<T: Reset + Send + Sync> {
size: usize,
discriminant: Mutex<u64>,
resources: Mutex<VecDeque<T>>,
not_empty: Condvar,
}
impl<T: Reset + Send + Sync> ResourcePool<T> {
pub fn new(pool_size: usize, resources: Vec<T>) -> Self {
Self {
size: pool_size,
discriminant: Mutex::new(0),
resources: Mutex::new(resources.into()),
not_empty: Condvar::new(),
}
}
pub fn acquire_resource(&self, timeout: Duration) -> StdResult<ResourcePoolItem<'_, T>> {
let mut resources = self
.resources
.lock()
.map_err(|_| ResourcePoolError::PoisonedLock)
.with_context(|| "Resource pool 'acquire_resource' failed locking Mutex")?;
while resources.is_empty() {
let (resources_locked, wait_result) = self
.not_empty
.wait_timeout(resources, timeout)
.map_err(|_| ResourcePoolError::PoisonedLock)
.with_context(|| "Resource pool 'acquire_resource' failed waiting for resource")?;
if wait_result.timed_out() {
return Err(ResourcePoolError::AcquireTimeout)
.with_context(|| "Resource pool 'acquire_resource' has timed out");
}
resources = resources_locked;
}
Ok(ResourcePoolItem::new(self, resources.pop_front().unwrap()))
}
pub fn give_back_resource(&self, resource: T, discriminant: u64) -> StdResult<()> {
let mut resource = resource;
resource.reset()?;
if self.count()? == self.size {
return Ok(());
}
let mut resources = self
.resources
.lock()
.map_err(|_| ResourcePoolError::PoisonedLock)
.with_context(|| "Resource pool 'give_back_resource' failed locking Mutex")?;
if self.discriminant()? != discriminant {
return Ok(());
}
resources.push_back(resource);
self.not_empty.notify_one();
Ok(())
}
pub fn give_back_resource_pool_item(
&self,
resource_pool_item: ResourcePoolItem<'_, T>,
) -> StdResult<()> {
let mut resource_pool_item = resource_pool_item;
resource_pool_item
.take()
.map(|resource_item| self.give_back_resource(resource_item, self.discriminant()?));
Ok(())
}
pub fn clear(&self) {
let mut resources = self.resources.lock().unwrap();
resources.clear();
}
pub fn discriminant(&self) -> StdResult<u64> {
Ok(*self
.discriminant
.lock()
.map_err(|_| ResourcePoolError::PoisonedLock)
.with_context(|| "Resource pool 'discriminant' failed locking Mutex")?)
}
pub fn set_discriminant(&self, discriminant: u64) -> StdResult<()> {
let mut discriminant_guard = self
.discriminant
.lock()
.map_err(|_| ResourcePoolError::PoisonedLock)
.with_context(|| "Resource pool 'set_discriminant' failed locking Mutex")?;
*discriminant_guard = discriminant;
Ok(())
}
pub fn count(&self) -> StdResult<usize> {
Ok(self
.resources
.lock()
.map_err(|_| ResourcePoolError::PoisonedLock)
.with_context(|| "Resource pool 'count' failed locking Mutex")?
.len())
}
pub fn size(&self) -> usize {
self.size
}
}
pub struct ResourcePoolItem<'a, T: Reset + Send + Sync> {
resource_pool: &'a ResourcePool<T>,
discriminant: u64,
resource: Option<T>,
}
impl<'a, T: Reset + Send + Sync> ResourcePoolItem<'a, T> {
pub fn new(resource_pool: &'a ResourcePool<T>, resource: T) -> Self {
let discriminant = *resource_pool.discriminant.lock().unwrap();
Self {
resource_pool,
discriminant,
resource: Some(resource),
}
}
pub fn discriminant(&self) -> u64 {
self.discriminant
}
fn take(&mut self) -> Option<T> {
self.resource.take()
}
}
impl<T: Reset + Send + Sync> Deref for ResourcePoolItem<'_, T> {
type Target = T;
fn deref(&self) -> &T {
self.resource.as_ref().unwrap()
}
}
impl<T: Reset + Send + Sync> DerefMut for ResourcePoolItem<'_, T> {
fn deref_mut(&mut self) -> &mut T {
self.resource.as_mut().unwrap()
}
}
impl<T: Reset + Send + Sync> Drop for ResourcePoolItem<'_, T> {
fn drop(&mut self) {
self.take().map(|resource| {
self.resource_pool
.give_back_resource(resource, self.discriminant)
});
}
}
pub trait Reset {
fn reset(&mut self) -> StdResult<()> {
Ok(())
}
}
cfg_test_tools! {
impl Reset for String {}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::*;
#[derive(Default)]
struct TestResetResource {
reset: bool,
}
impl Reset for TestResetResource {
fn reset(&mut self) -> StdResult<()> {
self.reset = true;
Ok(())
}
}
#[test]
fn test_resource_pool_acquire_returns_resource_when_available() {
let pool_size = 10;
let resources_expected: Vec<String> = (0..pool_size).map(|i| i.to_string()).collect();
let pool = ResourcePool::<String>::new(pool_size, resources_expected.clone());
let mut resources_items = vec![];
for _ in 0..pool_size {
let resource_item = pool.acquire_resource(Duration::from_millis(10)).unwrap();
resources_items.push(resource_item);
}
let resources_result = resources_items
.iter_mut()
.map(|resource_item| resource_item.take().unwrap())
.collect::<Vec<_>>();
assert_eq!(resources_expected, resources_result);
assert_eq!(pool.count().unwrap(), 0);
}
#[tokio::test]
async fn test_resource_pool_acquire_locks_until_timeout_when_no_resource_available() {
let pool_size = 10;
let resources_expected: Vec<String> = (0..pool_size).map(|i| i.to_string()).collect();
let pool = ResourcePool::<String>::new(pool_size, resources_expected.clone());
let mut resources_items = vec![];
for _ in 0..pool_size {
let resource_item = pool.acquire_resource(Duration::from_millis(10)).unwrap();
resources_items.push(resource_item);
}
assert!(pool.acquire_resource(Duration::from_millis(1000)).is_err());
}
#[tokio::test]
async fn test_resource_pool_clears_successfully() {
let pool_size = 10;
let resources_expected: Vec<String> = (0..pool_size).map(|i| i.to_string()).collect();
let pool = ResourcePool::<String>::new(pool_size, resources_expected.clone());
assert_eq!(pool.count().unwrap(), pool_size);
pool.clear();
assert_eq!(pool.count().unwrap(), 0);
}
#[tokio::test]
async fn test_resource_pool_gives_back_fresh_resource() {
let pool_size = 10;
let resources_expected: Vec<String> = (0..pool_size).map(|i| i.to_string()).collect();
let pool = ResourcePool::<String>::new(pool_size, resources_expected.clone());
assert_eq!(pool.count().unwrap(), pool_size);
let mut resource_item = pool.acquire_resource(Duration::from_millis(10)).unwrap();
assert_eq!(pool.count().unwrap(), pool_size - 1);
pool.give_back_resource(resource_item.take().unwrap(), pool.discriminant().unwrap())
.unwrap();
assert_eq!(pool.count().unwrap(), pool_size);
}
#[tokio::test]
async fn test_resource_pool_gives_back_resource_automatically() {
let pool_size = 10;
let resources_expected: Vec<String> = (0..pool_size).map(|i| i.to_string()).collect();
let pool = ResourcePool::<String>::new(pool_size, resources_expected.clone());
assert_eq!(pool.count().unwrap(), pool_size);
{
let _resource_item = pool.acquire_resource(Duration::from_millis(10)).unwrap();
assert_eq!(pool.count().unwrap(), pool_size - 1);
}
assert_eq!(pool.count().unwrap(), pool_size);
}
#[tokio::test]
async fn test_resource_pool_does_not_give_back_resource_when_pool_is_full() {
let pool_size = 10;
let resources_expected: Vec<String> = (0..pool_size).map(|i| i.to_string()).collect();
let pool = ResourcePool::<String>::new(pool_size, resources_expected.clone());
assert_eq!(pool.count().unwrap(), pool_size);
pool.give_back_resource("resource".to_string(), pool.discriminant().unwrap())
.unwrap();
assert_eq!(pool.count().unwrap(), pool_size);
}
#[tokio::test]
async fn test_resource_pool_does_not_give_back_stale_resource() {
let pool_size = 10;
let resources_expected: Vec<String> = (0..pool_size).map(|i| i.to_string()).collect();
let pool = ResourcePool::<String>::new(pool_size, resources_expected.clone());
assert_eq!(pool.count().unwrap(), pool_size);
let mut resource_item = pool.acquire_resource(Duration::from_millis(10)).unwrap();
assert_eq!(pool.count().unwrap(), pool_size - 1);
let discriminant_stale = pool.discriminant().unwrap();
pool.set_discriminant(pool.discriminant().unwrap() + 1)
.unwrap();
pool.give_back_resource(resource_item.take().unwrap(), discriminant_stale)
.unwrap();
assert_eq!(pool.count().unwrap(), pool_size - 1);
}
#[tokio::test]
async fn test_resource_pool_gives_back_fresh_resource_pool_item_if_not_taken_yet() {
let pool_size = 10;
let resources_expected: Vec<String> = (0..pool_size).map(|i| i.to_string()).collect();
let pool = ResourcePool::<String>::new(pool_size, resources_expected.clone());
assert_eq!(pool.count().unwrap(), pool_size);
let resource_item = pool.acquire_resource(Duration::from_millis(10)).unwrap();
assert_eq!(pool.count().unwrap(), pool_size - 1);
pool.give_back_resource_pool_item(resource_item).unwrap();
assert_eq!(pool.count().unwrap(), pool_size);
}
#[tokio::test]
async fn test_resource_pool_does_not_give_back_fresh_resource_pool_item_if_already_taken() {
let pool_size = 10;
let resources_expected: Vec<String> = (0..pool_size).map(|i| i.to_string()).collect();
let pool = ResourcePool::<String>::new(pool_size, resources_expected.clone());
assert_eq!(pool.count().unwrap(), pool_size);
{
let mut resource_item = pool.acquire_resource(Duration::from_millis(10)).unwrap();
assert_eq!(pool.count().unwrap(), pool_size - 1);
let _resource = resource_item.take().unwrap(); pool.give_back_resource_pool_item(resource_item).unwrap();
}
assert_eq!(pool.count().unwrap(), pool_size - 1);
}
#[tokio::test]
async fn test_resource_pool_is_reset_when_given_back() {
let pool = ResourcePool::<TestResetResource>::new(1, vec![TestResetResource::default()]);
let mut resource_item = pool.acquire_resource(Duration::from_millis(10)).unwrap();
let resource = resource_item.take().unwrap();
pool.give_back_resource(resource, pool.discriminant().unwrap())
.unwrap();
let resource_item = pool.acquire_resource(Duration::from_millis(10)).unwrap();
assert!(resource_item.reset);
}
#[tokio::test]
async fn test_resource_pool_item_is_reset_when_given_back() {
let pool = ResourcePool::<TestResetResource>::new(1, vec![TestResetResource::default()]);
let resource_item = pool.acquire_resource(Duration::from_millis(10)).unwrap();
pool.give_back_resource_pool_item(resource_item).unwrap();
let resource_item = pool.acquire_resource(Duration::from_millis(10)).unwrap();
assert!(resource_item.reset);
}
}