Skip to content

Commit

Permalink
Provide statistics of gets and contention
Browse files Browse the repository at this point in the history
Two new attributes are added to the State object for providing to the
user information about the number of gets and which ones suffered from
contention.

By doing the following operation `gets_with_contention/gets` the user can understand
if the connection pool is underprovisioned or if the min idle connections is not properly
configured.
  • Loading branch information
pfreixes committed Apr 18, 2024
1 parent cd5c510 commit 4f1caff
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 10 deletions.
2 changes: 1 addition & 1 deletion bb8/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use async_trait::async_trait;

use crate::inner::PoolInner;
use crate::internals::Conn;
pub use crate::internals::State;
pub use crate::inner::State;

/// A generic connection pool.
pub struct Pool<M>
Expand Down
78 changes: 72 additions & 6 deletions bb8/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::cmp::{max, min};
use std::fmt;
use std::future::Future;
use std::sync::{Arc, Weak};
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::{Duration, Instant};

use futures_util::stream::{FuturesUnordered, StreamExt};
Expand All @@ -10,13 +11,36 @@ use tokio::spawn;
use tokio::time::{interval_at, sleep, timeout, Interval};

use crate::api::{Builder, ConnectionState, ManageConnection, PooledConnection, RunError};
use crate::internals::{Approval, ApprovalIter, Conn, SharedPool, State};
use crate::internals::{Approval, ApprovalIter, Conn, SharedPool};

struct SharedPoolInnerStats {
gets: AtomicU32,
gets_waited: AtomicU32,
}

impl SharedPoolInnerStats {
fn new() -> Self {
Self {
gets: AtomicU32::new(0),
gets_waited: AtomicU32::new(0),
}
}

fn record_get(&self, with_contention: bool) {
self.gets.fetch_add(1, Ordering::SeqCst);

if with_contention {
self.gets_waited.fetch_add(1, Ordering::SeqCst);
}
}
}

pub(crate) struct PoolInner<M>
where
M: ManageConnection + Send,
{
inner: Arc<SharedPool<M>>,
pool_inner_stats : Arc<SharedPoolInnerStats>,
}

impl<M> PoolInner<M>
Expand All @@ -25,6 +49,7 @@ where
{
pub(crate) fn new(builder: Builder<M>, manager: M) -> Self {
let inner = Arc::new(SharedPool::new(builder, manager));
let pool_inner_stats = Arc::new(SharedPoolInnerStats::new());

if inner.statics.max_lifetime.is_some() || inner.statics.idle_timeout.is_some() {
let start = Instant::now() + inner.statics.reaper_rate;
Expand All @@ -33,12 +58,16 @@ where
Reaper {
interval,
pool: Arc::downgrade(&inner),
pool_inner_stats: Arc::downgrade(&pool_inner_stats),
}
.run(),
);
}

Self { inner }
Self {
inner,
pool_inner_stats,
}
}

pub(crate) async fn start_connections(&self) -> Result<(), M::Error> {
Expand Down Expand Up @@ -85,6 +114,8 @@ where
}

pub(crate) async fn get(&self) -> Result<PooledConnection<'_, M>, RunError<M::Error>> {
let mut with_contention = false;

let future = async {
loop {
let (conn, approvals) = self.inner.pop();
Expand All @@ -96,6 +127,7 @@ where
let mut conn = match conn {
Some(conn) => PooledConnection::new(self, conn),
None => {
with_contention = true;
self.inner.notify.notified().await;
continue;
}
Expand All @@ -116,10 +148,14 @@ where
}
};

match timeout(self.inner.statics.connection_timeout, future).await {
let result = match timeout(self.inner.statics.connection_timeout, future).await {
Ok(result) => result,
_ => Err(RunError::TimedOut),
}
};

self.pool_inner_stats.record_get(with_contention);

result
}

pub(crate) async fn connect(&self) -> Result<M::Connection, M::Error> {
Expand Down Expand Up @@ -148,7 +184,16 @@ where

/// Returns information about the current state of the pool.
pub(crate) fn state(&self) -> State {
self.inner.internals.lock().state()
let gets: u32 = self.pool_inner_stats.gets.load(Ordering::SeqCst);
let gets_waited: u32 = self.pool_inner_stats.gets_waited.load(Ordering::SeqCst);
let pool_internal_state = self.inner.internals.lock().state();

State {
gets: gets,

Check failure on line 192 in bb8/src/inner.rs

View workflow job for this annotation

GitHub Actions / lint

redundant field names in struct initialization
gets_waited: gets_waited,

Check failure on line 193 in bb8/src/inner.rs

View workflow job for this annotation

GitHub Actions / lint

redundant field names in struct initialization
connections: pool_internal_state.connections,
idle_connections: pool_internal_state.idle_connections,
}
}

// Outside of Pool to avoid borrow splitting issues on self
Expand Down Expand Up @@ -212,6 +257,7 @@ where
fn clone(&self) -> Self {
PoolInner {
inner: self.inner.clone(),
pool_inner_stats: self.pool_inner_stats.clone(),
}
}
}
Expand All @@ -228,14 +274,15 @@ where
struct Reaper<M: ManageConnection> {
interval: Interval,
pool: Weak<SharedPool<M>>,
pool_inner_stats: Weak<SharedPoolInnerStats>,
}

impl<M: ManageConnection> Reaper<M> {
async fn run(mut self) {
loop {
let _ = self.interval.tick().await;
let pool = match self.pool.upgrade() {
Some(inner) => PoolInner { inner },
Some(inner) => PoolInner { inner, pool_inner_stats: self.pool_inner_stats.upgrade().unwrap()},
None => break,
};

Expand All @@ -244,3 +291,22 @@ impl<M: ManageConnection> Reaper<M> {
}
}
}

/// Information about the state of a `Pool`.
#[derive(Debug)]
#[non_exhaustive]
pub struct State {
/// Information about gets
/// Total gets performed, you should consider that the
/// value can overflow and start from 0 eventually.
pub gets: u32,
/// Total gets performed that had to wait for having a
/// connection available. The value can overflow and
/// start from 0 eventually.
pub gets_waited: u32,
/// Information about the connections
/// The number of connections currently being managed by the pool.
pub connections: u32,
/// The number of idle connections.
pub idle_connections: u32,
}
6 changes: 3 additions & 3 deletions bb8/src/internals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ where
self.dropped((before - self.conns.len()) as u32, config)
}

pub(crate) fn state(&self) -> State {
State {
pub(crate) fn state(&self) -> PoolInternalState {
PoolInternalState {
connections: self.num_conns,
idle_connections: self.conns.len() as u32,
}
Expand Down Expand Up @@ -249,7 +249,7 @@ impl<C: Send> From<Conn<C>> for IdleConn<C> {
/// Information about the state of a `Pool`.
#[derive(Debug)]
#[non_exhaustive]
pub struct State {
pub struct PoolInternalState {
/// The number of connections currently being managed by the pool.
pub connections: u32,
/// The number of idle connections.
Expand Down
41 changes: 41 additions & 0 deletions bb8/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -885,3 +885,44 @@ async fn test_broken_connections_dont_starve_pool() {
future.await.unwrap();
}
}

#[tokio::test]
async fn test_state_get_contention() {
let pool = Pool::builder()
.max_size(1)
.min_idle(1)
.build(OkManager::<FakeConnection>::new())
.await
.unwrap();

let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let clone = pool.clone();
tokio::spawn(async move {
let conn = clone.get().await.unwrap();
tx1.send(()).unwrap();
let _ = rx2
.then(|r| match r {
Ok(v) => ok((v, conn)),
Err(_) => err((Error, conn)),
})
.await;
});

// Get the first connection.
rx1.await.unwrap();

// Now try to get a new connection without waiting.
let f = pool.get();

// Release the first connection.
tx2.send(()).unwrap();

// Wait for the second attempt to get a connection.
f.await.unwrap();

let state = pool.state();
assert_eq!(state.gets, 2);
assert_eq!(state.gets_waited, 1);
}

0 comments on commit 4f1caff

Please sign in to comment.