Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for customized future executor #144

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
52 changes: 43 additions & 9 deletions bb8/src/api.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
use crate::inner::PoolInner;
use crate::internals::Conn;
pub use crate::internals::State;
use async_trait::async_trait;
use std::borrow::Cow;
use std::error;
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::time::Duration;

use async_trait::async_trait;

use crate::inner::PoolInner;
use crate::internals::Conn;
pub use crate::internals::State;
/// An executor of futures.
gaoqiangz marked this conversation as resolved.
Show resolved Hide resolved
pub trait Executor: Sync + Send + 'static {
/// Place the future into the executor to be run.
fn execute(&self, fut: Pin<Box<dyn Future<Output = ()> + Send>>);
gaoqiangz marked this conversation as resolved.
Show resolved Hide resolved
}

/// A generic connection pool.
pub struct Pool<M>
Expand Down Expand Up @@ -253,7 +259,7 @@ impl<M: ManageConnection> Builder<M> {
self
}

fn build_inner(self, manager: M) -> Pool<M> {
fn build_inner<E: Executor>(self, manager: M, executor: E) -> Pool<M> {
if let Some(min_idle) = self.min_idle {
assert!(
self.max_size >= min_idle,
Expand All @@ -262,7 +268,7 @@ impl<M: ManageConnection> Builder<M> {
}

Pool {
inner: PoolInner::new(self, manager),
inner: PoolInner::new(self, manager, executor),
}
}

Expand All @@ -271,7 +277,19 @@ impl<M: ManageConnection> Builder<M> {
/// The `Pool` will not be returned until it has established its configured
/// minimum number of connections, or it times out.
pub async fn build(self, manager: M) -> Result<Pool<M>, M::Error> {
let pool = self.build_inner(manager);
self.build_with_executor(manager, TokioExecutor).await
}

/// Consumes the builder with the specified executor, returning a new, initialized `Pool`.
///
/// The `Pool` will not be returned until it has established its configured
/// minimum number of connections, or it times out.
pub async fn build_with_executor<E: Executor>(
self,
manager: M,
executor: E,
) -> Result<Pool<M>, M::Error> {
let pool = self.build_inner(manager, executor);
pool.inner.start_connections().await.map(|()| pool)
}

Expand All @@ -280,12 +298,28 @@ impl<M: ManageConnection> Builder<M> {
/// Unlike `build`, this does not wait for any connections to be established
/// before returning.
pub fn build_unchecked(self, manager: M) -> Pool<M> {
let p = self.build_inner(manager);
self.build_unchecked_with_executor(manager, TokioExecutor)
}

/// Consumes the builder with the specified executor, returning a new, initialized `Pool`.
///
/// Unlike `build`, this does not wait for any connections to be established
/// before returning.
pub fn build_unchecked_with_executor<E: Executor>(self, manager: M, executor: E) -> Pool<M> {
let p = self.build_inner(manager, executor);
p.inner.spawn_start_connections();
p
}
}

struct TokioExecutor;

impl Executor for TokioExecutor {
fn execute(&self, fut: Pin<Box<dyn Future<Output = ()> + Send>>) {
tokio::spawn(fut);
}
}

/// A trait which provides connection-specific functionality.
#[async_trait]
pub trait ManageConnection: Sized + Send + Sync + 'static {
Expand Down
34 changes: 17 additions & 17 deletions bb8/src/inner.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
use crate::api::{Builder, Executor, ManageConnection, PooledConnection, RunError};
use crate::internals::{Approval, ApprovalIter, Conn, SharedPool, State};
use futures_channel::oneshot;
use futures_util::stream::{FuturesUnordered, StreamExt};
gaoqiangz marked this conversation as resolved.
Show resolved Hide resolved
use futures_util::TryFutureExt;
use std::cmp::{max, min};
use std::fmt;
use std::future::Future;
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};

use futures_channel::oneshot;
use futures_util::stream::{FuturesUnordered, StreamExt};
use futures_util::TryFutureExt;
use tokio::spawn;
use tokio::time::{interval_at, sleep, timeout, Interval};

use crate::api::{Builder, ManageConnection, PooledConnection, RunError};
use crate::internals::{Approval, ApprovalIter, Conn, SharedPool, State};

pub(crate) struct PoolInner<M>
where
M: ManageConnection + Send,
Expand All @@ -24,15 +21,15 @@ impl<M> PoolInner<M>
where
M: ManageConnection + Send,
{
pub(crate) fn new(builder: Builder<M>, manager: M) -> Self {
let inner = Arc::new(SharedPool::new(builder, manager));
pub(crate) fn new<E: Executor>(builder: Builder<M>, manager: M, executor: E) -> Self {
let inner = Arc::new(SharedPool::new(builder, manager, executor));

if inner.statics.max_lifetime.is_some() || inner.statics.idle_timeout.is_some() {
let s = Arc::downgrade(&inner);
if let Some(shared) = s.upgrade() {
let start = Instant::now() + shared.statics.reaper_rate;
let interval = interval_at(start.into(), shared.statics.reaper_rate);
schedule_reaping(interval, s);
schedule_reaping(&inner.executor, interval, s);
}
}

Expand All @@ -59,15 +56,15 @@ where
}

let this = self.clone();
spawn(async move {
self.inner.executor.execute(Box::pin(async move {
let mut stream = this.replenish_idle_connections(approvals);
while let Some(result) = stream.next().await {
match result {
Ok(()) => {}
Err(e) => this.inner.statics.error_sink.sink(e),
}
}
});
}));
}

fn replenish_idle_connections(
Expand Down Expand Up @@ -254,11 +251,14 @@ where
}
}

fn schedule_reaping<M>(mut interval: Interval, weak_shared: Weak<SharedPool<M>>)
where
fn schedule_reaping<M>(
executor: &Box<dyn Executor>,
gaoqiangz marked this conversation as resolved.
Show resolved Hide resolved
mut interval: Interval,
weak_shared: Weak<SharedPool<M>>,
) where
M: ManageConnection,
{
spawn(async move {
executor.execute(Box::pin(async move {
loop {
let _ = interval.tick().await;
if let Some(inner) = weak_shared.upgrade() {
Expand All @@ -267,5 +267,5 @@ where
break;
}
}
});
}));
}
14 changes: 7 additions & 7 deletions bb8/src/internals.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use std::cmp::min;
use std::sync::Arc;
use std::time::Instant;

use crate::api::{Builder, Executor, ManageConnection};
use futures_channel::oneshot;
use parking_lot::Mutex;

use crate::api::{Builder, ManageConnection};
use std::cmp::min;
gaoqiangz marked this conversation as resolved.
Show resolved Hide resolved
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::Instant;

/// The guts of a `Pool`.
#[allow(missing_debug_implementations)]
Expand All @@ -15,6 +13,7 @@ where
M: ManageConnection + Send,
{
pub(crate) statics: Builder<M>,
pub(crate) executor: Box<dyn Executor + 'static>,
pub(crate) manager: M,
pub(crate) internals: Mutex<PoolInternals<M>>,
}
Expand All @@ -23,11 +22,12 @@ impl<M> SharedPool<M>
where
M: ManageConnection + Send,
{
pub(crate) fn new(statics: Builder<M>, manager: M) -> Self {
pub(crate) fn new<E: Executor>(statics: Builder<M>, manager: M, executor: E) -> Self {
Self {
statics,
manager,
internals: Mutex::new(PoolInternals::default()),
executor: Box::new(executor),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion bb8/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

mod api;
pub use api::{
Builder, CustomizeConnection, ErrorSink, ManageConnection, NopErrorSink, Pool,
Builder, CustomizeConnection, ErrorSink, Executor, ManageConnection, NopErrorSink, Pool,
PooledConnection, RunError, State,
};

Expand Down