Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for customized future executor #144

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions bb8/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
[package]
name = "bb8"
version = "0.8.0"
description = "Full-featured async (tokio-based) connection pool (like r2d2)"
version = "0.8.0-rc.1"
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I won't be able to merge this PR with this stuff in it, please revert these changes.

description = "[FORK] Full-featured async (tokio-based) connection pool (like r2d2)"
license = "MIT"
repository = "https://github.com/djc/bb8"
repository = "https://github.com/gaoqiangz/bb8"
edition = "2021"
workspace = ".."
readme = "../README.md"
Expand Down
44 changes: 40 additions & 4 deletions bb8/src/api.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::borrow::Cow;
use std::error;
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::time::Duration;

use async_trait::async_trait;
Expand Down Expand Up @@ -253,7 +255,7 @@ impl<M: ManageConnection> Builder<M> {
self
}

fn build_inner(self, manager: M) -> Pool<M> {
fn build_inner<E: Executor>(self, manager: M, executor: E) -> Pool<M> {
if let Some(min_idle) = self.min_idle {
assert!(
self.max_size >= min_idle,
Expand All @@ -262,7 +264,7 @@ impl<M: ManageConnection> Builder<M> {
}

Pool {
inner: PoolInner::new(self, manager),
inner: PoolInner::new(self, manager, executor),
}
}

Expand All @@ -271,7 +273,19 @@ impl<M: ManageConnection> Builder<M> {
/// The `Pool` will not be returned until it has established its configured
/// minimum number of connections, or it times out.
pub async fn build(self, manager: M) -> Result<Pool<M>, M::Error> {
let pool = self.build_inner(manager);
self.build_with_executor(manager, TokioExecutor).await
}

/// Consumes the builder with the specified executor, returning a new, initialized `Pool`.
///
/// The `Pool` will not be returned until it has established its configured
/// minimum number of connections, or it times out.
pub async fn build_with_executor<E: Executor>(
self,
manager: M,
executor: E,
) -> Result<Pool<M>, M::Error> {
let pool = self.build_inner(manager, executor);
pool.inner.start_connections().await.map(|()| pool)
}

Expand All @@ -280,7 +294,15 @@ impl<M: ManageConnection> Builder<M> {
/// Unlike `build`, this does not wait for any connections to be established
/// before returning.
pub fn build_unchecked(self, manager: M) -> Pool<M> {
let p = self.build_inner(manager);
self.build_unchecked_with_executor(manager, TokioExecutor)
}

/// Consumes the builder with the specified executor, returning a new, initialized `Pool`.
///
/// Unlike `build`, this does not wait for any connections to be established
/// before returning.
pub fn build_unchecked_with_executor<E: Executor>(self, manager: M, executor: E) -> Pool<M> {
let p = self.build_inner(manager, executor);
p.inner.spawn_start_connections();
p
}
Expand Down Expand Up @@ -456,3 +478,17 @@ impl<E> ErrorSink<E> for NopErrorSink {
Box::new(*self)
}
}

/// An executor of futures.
pub trait Executor: Sync + Send + 'static {
/// Place the future into the executor to be run.
fn execute(&self, fut: Pin<Box<dyn Future<Output = ()> + Send>>);
}

struct TokioExecutor;

impl Executor for TokioExecutor {
fn execute(&self, fut: Pin<Box<dyn Future<Output = ()> + Send>>) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would prefer to call this spawn().

tokio::spawn(fut);
}
}
26 changes: 14 additions & 12 deletions bb8/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@ use std::fmt;
use std::future::Future;
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};
use tokio::time::{interval_at, sleep, timeout, Interval};

use futures_channel::oneshot;
use futures_util::stream::{FuturesUnordered, StreamExt};
use futures_util::TryFutureExt;
use tokio::spawn;
use tokio::time::{interval_at, sleep, timeout, Interval};

use crate::api::{Builder, ManageConnection, PooledConnection, RunError};
use crate::api::{Builder, Executor, ManageConnection, PooledConnection, RunError};
use crate::internals::{Approval, ApprovalIter, Conn, SharedPool, State};

pub(crate) struct PoolInner<M>
Expand All @@ -24,15 +23,15 @@ impl<M> PoolInner<M>
where
M: ManageConnection + Send,
{
pub(crate) fn new(builder: Builder<M>, manager: M) -> Self {
let inner = Arc::new(SharedPool::new(builder, manager));
pub(crate) fn new<E: Executor>(builder: Builder<M>, manager: M, executor: E) -> Self {
let inner = Arc::new(SharedPool::new(builder, manager, executor));

if inner.statics.max_lifetime.is_some() || inner.statics.idle_timeout.is_some() {
let s = Arc::downgrade(&inner);
if let Some(shared) = s.upgrade() {
let start = Instant::now() + shared.statics.reaper_rate;
let interval = interval_at(start.into(), shared.statics.reaper_rate);
schedule_reaping(interval, s);
schedule_reaping(inner.executor.as_ref(), interval, s);
}
}

Expand All @@ -59,15 +58,15 @@ where
}

let this = self.clone();
spawn(async move {
self.inner.executor.execute(Box::pin(async move {
let mut stream = this.replenish_idle_connections(approvals);
while let Some(result) = stream.next().await {
match result {
Ok(()) => {}
Err(e) => this.inner.statics.error_sink.sink(e),
}
}
});
}));
}

fn replenish_idle_connections(
Expand Down Expand Up @@ -254,11 +253,14 @@ where
}
}

fn schedule_reaping<M>(mut interval: Interval, weak_shared: Weak<SharedPool<M>>)
where
fn schedule_reaping<M>(
executor: &dyn Executor,
mut interval: Interval,
weak_shared: Weak<SharedPool<M>>,
) where
M: ManageConnection,
{
spawn(async move {
executor.execute(Box::pin(async move {
loop {
let _ = interval.tick().await;
if let Some(inner) = weak_shared.upgrade() {
Expand All @@ -267,5 +269,5 @@ where
break;
}
}
});
}));
}
8 changes: 5 additions & 3 deletions bb8/src/internals.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::cmp::min;
gaoqiangz marked this conversation as resolved.
Show resolved Hide resolved
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::Instant;

use futures_channel::oneshot;
use parking_lot::Mutex;

use crate::api::{Builder, ManageConnection};
use std::collections::VecDeque;
use crate::api::{Builder, Executor, ManageConnection};

/// The guts of a `Pool`.
#[allow(missing_debug_implementations)]
Expand All @@ -15,6 +15,7 @@ where
M: ManageConnection + Send,
{
pub(crate) statics: Builder<M>,
pub(crate) executor: Box<dyn Executor>,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need a trait object? It would be nice if we didn't have to Box every future just to support this fairly niche use case.

pub(crate) manager: M,
pub(crate) internals: Mutex<PoolInternals<M>>,
}
Expand All @@ -23,11 +24,12 @@ impl<M> SharedPool<M>
where
M: ManageConnection + Send,
{
pub(crate) fn new(statics: Builder<M>, manager: M) -> Self {
pub(crate) fn new<E: Executor>(statics: Builder<M>, manager: M, executor: E) -> Self {
Self {
statics,
manager,
internals: Mutex::new(PoolInternals::default()),
executor: Box::new(executor),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion bb8/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

mod api;
pub use api::{
Builder, CustomizeConnection, ErrorSink, ManageConnection, NopErrorSink, Pool,
Builder, CustomizeConnection, ErrorSink, Executor, ManageConnection, NopErrorSink, Pool,
PooledConnection, RunError, State,
};

Expand Down