diff --git a/sqlx-bench/benches/pg_pool.rs b/sqlx-bench/benches/pg_pool.rs index 793b7e8044..906bb8a47e 100644 --- a/sqlx-bench/benches/pg_pool.rs +++ b/sqlx-bench/benches/pg_pool.rs @@ -26,7 +26,7 @@ fn do_bench_acquire(b: &mut Bencher, concurrent: u32, fair: bool) { let pool = sqlx_rt::block_on( PgPoolOptions::new() // we don't want timeouts because we want to see how the pool degrades - .connect_timeout(Duration::from_secs(3600)) + .acquire_timeout(Duration::from_secs(3600)) // force the pool to start full .min_connections(50) .max_connections(50) diff --git a/sqlx-core/src/any/connection/mod.rs b/sqlx-core/src/any/connection/mod.rs index 6f8d1e3860..f057551db5 100644 --- a/sqlx-core/src/any/connection/mod.rs +++ b/sqlx-core/src/any/connection/mod.rs @@ -136,6 +136,22 @@ impl Connection for AnyConnection { } } + fn close_hard(self) -> BoxFuture<'static, Result<(), Error>> { + match self.0 { + #[cfg(feature = "postgres")] + AnyConnectionKind::Postgres(conn) => conn.close_hard(), + + #[cfg(feature = "mysql")] + AnyConnectionKind::MySql(conn) => conn.close_hard(), + + #[cfg(feature = "sqlite")] + AnyConnectionKind::Sqlite(conn) => conn.close_hard(), + + #[cfg(feature = "mssql")] + AnyConnectionKind::Mssql(conn) => conn.close_hard(), + } + } + fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> { delegate_to_mut!(self.ping()) } diff --git a/sqlx-core/src/connection.rs b/sqlx-core/src/connection.rs index 4fb103a2ee..80469ea3e5 100644 --- a/sqlx-core/src/connection.rs +++ b/sqlx-core/src/connection.rs @@ -20,6 +20,12 @@ pub trait Connection: Send { /// will be faster at cleaning up resources. fn close(self) -> BoxFuture<'static, Result<(), Error>>; + /// Immediately close the connection without sending a graceful shutdown. + /// + /// This should still at least send a TCP `FIN` frame to let the server know we're dying. + #[doc(hidden)] + fn close_hard(self) -> BoxFuture<'static, Result<(), Error>>; + /// Checks if a connection to the database is still valid. fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>>; diff --git a/sqlx-core/src/error.rs b/sqlx-core/src/error.rs index cd044bc0d4..cc8ff523a6 100644 --- a/sqlx-core/src/error.rs +++ b/sqlx-core/src/error.rs @@ -165,6 +165,11 @@ pub trait DatabaseError: 'static + Send + Sync + StdError { #[doc(hidden)] fn into_error(self: Box) -> Box; + #[doc(hidden)] + fn is_transient_in_connect_phase(&self) -> bool { + false + } + /// Returns the name of the constraint that triggered the error, if applicable. /// If the error was caused by a conflict of a unique index, this will be the index name. /// diff --git a/sqlx-core/src/mssql/connection/mod.rs b/sqlx-core/src/mssql/connection/mod.rs index 38c577d062..9df50b1a93 100644 --- a/sqlx-core/src/mssql/connection/mod.rs +++ b/sqlx-core/src/mssql/connection/mod.rs @@ -55,6 +55,10 @@ impl Connection for MssqlConnection { } } + fn close_hard(self) -> BoxFuture<'static, Result<(), Error>> { + self.close() + } + fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> { // NOTE: we do not use `SELECT 1` as that *could* interact with any ongoing transactions self.execute("/* SQLx ping */").map_ok(|_| ()).boxed() diff --git a/sqlx-core/src/mysql/connection/mod.rs b/sqlx-core/src/mysql/connection/mod.rs index 4ade06beeb..1f87eaa918 100644 --- a/sqlx-core/src/mysql/connection/mod.rs +++ b/sqlx-core/src/mysql/connection/mod.rs @@ -56,6 +56,13 @@ impl Connection for MySqlConnection { }) } + fn close_hard(mut self) -> BoxFuture<'static, Result<(), Error>> { + Box::pin(async move { + self.stream.shutdown().await?; + Ok(()) + }) + } + fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> { Box::pin(async move { self.stream.wait_until_ready().await?; diff --git a/sqlx-core/src/pool/connection.rs b/sqlx-core/src/pool/connection.rs index c5c51ab5d3..74458a56bb 100644 --- a/sqlx-core/src/pool/connection.rs +++ b/sqlx-core/src/pool/connection.rs @@ -1,7 +1,7 @@ use std::fmt::{self, Debug, Formatter}; use std::ops::{Deref, DerefMut}; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; use futures_intrusive::sync::SemaphoreReleaser; @@ -9,7 +9,8 @@ use crate::connection::Connection; use crate::database::Database; use crate::error::Error; -use super::inner::{DecrementSizeGuard, SharedPool}; +use super::inner::{DecrementSizeGuard, PoolInner}; +use crate::pool::options::PoolConnectionMetadata; use std::future::Future; /// A connection managed by a [`Pool`][crate::pool::Pool]. @@ -17,17 +18,17 @@ use std::future::Future; /// Will be returned to the pool on-drop. pub struct PoolConnection { live: Option>, - pub(crate) pool: Arc>, + pub(crate) pool: Arc>, } pub(super) struct Live { pub(super) raw: DB::Connection, - pub(super) created: Instant, + pub(super) created_at: Instant, } pub(super) struct Idle { pub(super) live: Live, - pub(super) since: Instant, + pub(super) idle_since: Instant, } /// RAII wrapper for connections being handled by functions that may drop them @@ -36,7 +37,7 @@ pub(super) struct Floating { pub(super) guard: DecrementSizeGuard, } -const DEREF_ERR: &str = "(bug) connection already released to pool"; +const EXPECT_MSG: &str = "BUG: inner connection already taken!"; impl Debug for PoolConnection { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { @@ -49,13 +50,13 @@ impl Deref for PoolConnection { type Target = DB::Connection; fn deref(&self) -> &Self::Target { - &self.live.as_ref().expect(DEREF_ERR).raw + &self.live.as_ref().expect(EXPECT_MSG).raw } } impl DerefMut for PoolConnection { fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.live.as_mut().expect(DEREF_ERR).raw + &mut self.live.as_mut().expect(EXPECT_MSG).raw } } @@ -72,25 +73,20 @@ impl AsMut for PoolConnection { } impl PoolConnection { - /// Explicitly release a connection from the pool - #[deprecated = "renamed to `.detach()` for clarity"] - pub fn release(self) -> DB::Connection { - self.detach() - } - /// Detach this connection from the pool, allowing it to open a replacement. /// /// Note that if your application uses a single shared pool, this - /// effectively lets the application exceed the `max_connections` setting. + /// effectively lets the application exceed the [`max_connections`] setting. + /// + /// If [`min_connections`] is nonzero, a task will be spawned to replace this connection. /// /// If you want the pool to treat this connection as permanently checked-out, /// use [`.leak()`][Self::leak] instead. + /// + /// [`max_connections`]: crate::pool::PoolOptions::max_connections + /// [`min_connections`]: crate::pool::PoolOptions::min_connections pub fn detach(mut self) -> DB::Connection { - self.live - .take() - .expect("PoolConnection double-dropped") - .float(self.pool.clone()) - .detach() + self.take_live().float(self.pool.clone()).detach() } /// Detach this connection from the pool, treating it as permanently checked-out. @@ -99,7 +95,11 @@ impl PoolConnection { /// /// If you don't want to impact the pool's capacity, use [`.detach()`][Self::detach] instead. pub fn leak(mut self) -> DB::Connection { - self.live.take().expect("PoolConnection double-dropped").raw + self.take_live().raw + } + + fn take_live(&mut self) -> Live { + self.live.take().expect(EXPECT_MSG) } /// Test the connection to make sure it is still live before returning it to the pool. @@ -109,34 +109,21 @@ impl PoolConnection { // float the connection in the pool before we move into the task // in case the returned `Future` isn't executed, like if it's spawned into a dying runtime // https://github.com/launchbadge/sqlx/issues/1396 - let floating = self.live.take().map(|live| live.float(self.pool.clone())); + // Type hints seem to be broken by `Option` combinators in IntelliJ Rust right now (6/22). + let floating: Option>> = + self.live.take().map(|live| live.float(self.pool.clone())); + + let pool = self.pool.clone(); async move { - let mut floating = if let Some(floating) = floating { - floating + let returned_to_pool = if let Some(floating) = floating { + floating.return_to_pool().await } else { - return; + false }; - // test the connection on-release to ensure it is still viable, - // and flush anything time-sensitive like transaction rollbacks - // if an Executor future/stream is dropped during an `.await` call, the connection - // is likely to be left in an inconsistent state, in which case it should not be - // returned to the pool; also of course, if it was dropped due to an error - // this is simply a band-aid as SQLx-next connections should be able - // to recover from cancellations - if let Err(e) = floating.raw.ping().await { - log::warn!( - "error occurred while testing the connection on-release: {}", - e - ); - - // we now consider the connection to be broken; just drop it to close - // trying to close gracefully might cause something weird to happen - drop(floating); - } else { - // if the connection is still viable, release it to the pool - floating.release(); + if !returned_to_pool { + pool.min_connections_maintenance(None).await; } } } @@ -145,7 +132,8 @@ impl PoolConnection { /// Returns the connection to the [`Pool`][crate::pool::Pool] it was checked-out from. impl Drop for PoolConnection { fn drop(&mut self) { - if self.live.is_some() { + // We still need to spawn a task to maintain `min_connections`. + if self.live.is_some() || self.pool.options.min_connections > 0 { #[cfg(not(feature = "_rt-async-std"))] if let Ok(handle) = sqlx_rt::Handle::try_current() { handle.spawn(self.return_to_pool()); @@ -158,7 +146,7 @@ impl Drop for PoolConnection { } impl Live { - pub fn float(self, pool: Arc>) -> Floating { + pub fn float(self, pool: Arc>) -> Floating { Floating { inner: self, // create a new guard from a previously leaked permit @@ -169,7 +157,7 @@ impl Live { pub fn into_idle(self) -> Idle { Idle { live: self, - since: Instant::now(), + idle_since: Instant::now(), } } } @@ -193,24 +181,21 @@ impl Floating> { Self { inner: Live { raw: conn, - created: Instant::now(), + created_at: Instant::now(), }, guard, } } - pub fn attach(self, pool: &Arc>) -> PoolConnection { + pub fn reattach(self) -> PoolConnection { let Floating { inner, guard } = self; - debug_assert!( - guard.same_pool(pool), - "BUG: attaching connection to different pool" - ); + let pool = Arc::clone(&guard.pool); guard.cancel(); PoolConnection { live: Some(inner), - pool: Arc::clone(pool), + pool, } } @@ -218,9 +203,66 @@ impl Floating> { self.guard.pool.clone().release(self); } - pub async fn close(self) -> Result<(), Error> { + /// Return the connection to the pool. + /// + /// Returns `true` if the connection was successfully returned, `false` if it was closed. + async fn return_to_pool(mut self) -> bool { + // Immediately close the connection. + if self.guard.pool.is_closed() { + self.close().await; + return false; + } + + if let Some(test) = &self.guard.pool.options.after_release { + let meta = self.metadata(); + match (test)(&mut self.inner.raw, meta).await { + Ok(true) => (), + Ok(false) => { + self.close().await; + return false; + } + Err(e) => { + log::warn!("error from after_release: {}", e); + // Connection is broken, don't try to gracefully close as + // something weird might happen. + self.close_hard().await; + return false; + } + } + } + + // test the connection on-release to ensure it is still viable, + // and flush anything time-sensitive like transaction rollbacks + // if an Executor future/stream is dropped during an `.await` call, the connection + // is likely to be left in an inconsistent state, in which case it should not be + // returned to the pool; also of course, if it was dropped due to an error + // this is simply a band-aid as SQLx-next connections should be able + // to recover from cancellations + if let Err(e) = self.raw.ping().await { + log::warn!( + "error occurred while testing the connection on-release: {}", + e + ); + + // Connection is broken, don't try to gracefully close. + self.close_hard().await; + false + } else { + // if the connection is still viable, release it to the pool + self.release(); + true + } + } + + pub async fn close(self) { + // This isn't used anywhere that we care about the return value + let _ = self.inner.raw.close().await; + // `guard` is dropped as intended - self.inner.raw.close().await + } + + pub async fn close_hard(self) { + let _ = self.inner.raw.close_hard().await; } pub fn detach(self) -> DB::Connection { @@ -233,12 +275,19 @@ impl Floating> { guard: self.guard, } } + + pub fn metadata(&self) -> PoolConnectionMetadata { + PoolConnectionMetadata { + age: self.created_at.elapsed(), + idle_for: Duration::ZERO, + } + } } impl Floating> { pub fn from_idle( idle: Idle, - pool: Arc>, + pool: Arc>, permit: SemaphoreReleaser<'_>, ) -> Self { Self { @@ -259,12 +308,27 @@ impl Floating> { } pub async fn close(self) -> DecrementSizeGuard { - // `guard` is dropped as intended if let Err(e) = self.inner.live.raw.close().await { log::debug!("error occurred while closing the pool connection: {}", e); } self.guard } + + pub async fn close_hard(self) -> DecrementSizeGuard { + let _ = self.inner.live.raw.close_hard().await; + + self.guard + } + + pub fn metadata(&self) -> PoolConnectionMetadata { + // Use a single `now` value for consistency. + let now = Instant::now(); + + PoolConnectionMetadata { + age: self.created_at.duration_since(now), + idle_for: self.idle_since.duration_since(now), + } + } } impl Deref for Floating { diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs index ffbd69a89b..86c0ec5d8c 100644 --- a/sqlx-core/src/pool/inner.rs +++ b/sqlx-core/src/pool/inner.rs @@ -3,36 +3,37 @@ use crate::connection::ConnectOptions; use crate::connection::Connection; use crate::database::Database; use crate::error::Error; -use crate::pool::{deadline_as_timeout, PoolOptions}; +use crate::pool::{deadline_as_timeout, CloseEvent, PoolOptions}; use crossbeam_queue::ArrayQueue; use futures_intrusive::sync::{Semaphore, SemaphoreReleaser}; use std::cmp; -use std::mem; -use std::ptr; -use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; +use std::future::Future; +use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering}; use std::sync::Arc; +use crate::pool::options::PoolConnectionMetadata; use std::time::{Duration, Instant}; -/// Ihe number of permits to release to wake all waiters, such as on `SharedPool::close()`. +/// Ihe number of permits to release to wake all waiters, such as on `PoolInner::close()`. /// /// This should be large enough to realistically wake all tasks waiting on the pool without /// potentially overflowing the permits count in the semaphore itself. const WAKE_ALL_PERMITS: usize = usize::MAX / 2; -pub(crate) struct SharedPool { +pub(crate) struct PoolInner { pub(super) connect_options: ::Options, pub(super) idle_conns: ArrayQueue>, pub(super) semaphore: Semaphore, pub(super) size: AtomicU32, + pub(super) num_idle: AtomicUsize, is_closed: AtomicBool, pub(super) on_closed: event_listener::Event, pub(super) options: PoolOptions, } -impl SharedPool { +impl PoolInner { pub(super) fn new_arc( options: PoolOptions, connect_options: ::Options, @@ -50,6 +51,7 @@ impl SharedPool { idle_conns: ArrayQueue::new(capacity), semaphore: Semaphore::new(options.fair, capacity), size: AtomicU32::new(0), + num_idle: AtomicUsize::new(0), is_closed: AtomicBool::new(false), on_closed: event_listener::Event::new(), options, @@ -57,7 +59,7 @@ impl SharedPool { let pool = Arc::new(pool); - spawn_reaper(&pool); + spawn_maintenance_tasks(&pool); pool } @@ -67,15 +69,19 @@ impl SharedPool { } pub(super) fn num_idle(&self) -> usize { - // NOTE: This is very expensive - self.idle_conns.len() + // We don't use `self.idle_conns.len()` as it waits for the internal + // head and tail pointers to stop changing for a moment before calculating the length, + // which may take a long time at high levels of churn. + // + // By maintaining our own atomic count, we avoid that issue entirely. + self.num_idle.load(Ordering::Acquire) } pub(super) fn is_closed(&self) -> bool { self.is_closed.load(Ordering::Acquire) } - pub(super) async fn close(self: &Arc) { + pub(super) fn close<'a>(self: &'a Arc) -> impl Future + 'a { let already_closed = self.is_closed.swap(true, Ordering::AcqRel); if !already_closed { @@ -86,14 +92,28 @@ impl SharedPool { self.on_closed.notify(usize::MAX); } - // wait for all permits to be released - let _permits = self - .semaphore - .acquire(WAKE_ALL_PERMITS + (self.options.max_connections as usize)) - .await; + async move { + // Close any currently idle connections in the pool. + while let Some(idle) = self.idle_conns.pop() { + let _ = idle.live.float((*self).clone()).close().await; + } + + // Wait for all permits to be released. + let _permits = self + .semaphore + .acquire(WAKE_ALL_PERMITS + (self.options.max_connections as usize)) + .await; + + // Clean up any remaining connections. + while let Some(idle) = self.idle_conns.pop() { + let _ = idle.live.float((*self).clone()).close().await; + } + } + } - while let Some(idle) = self.idle_conns.pop() { - let _ = idle.live.float((*self).clone()).close().await; + pub(crate) fn close_event(&self) -> CloseEvent { + CloseEvent { + listener: (!self.is_closed()).then(|| self.on_closed.listen()), } } @@ -112,19 +132,15 @@ impl SharedPool { permit: SemaphoreReleaser<'a>, ) -> Result>, SemaphoreReleaser<'a>> { if let Some(idle) = self.idle_conns.pop() { + self.num_idle.fetch_sub(1, Ordering::AcqRel); Ok(Floating::from_idle(idle, (*self).clone(), permit)) } else { Err(permit) } } - pub(super) fn release(&self, mut floating: Floating>) { - if let Some(test) = &self.options.after_release { - if !test(&mut floating.raw) { - // drop the connection and do not return it to the pool - return; - } - } + pub(super) fn release(&self, floating: Floating>) { + // `options.after_release` is invoked by `PoolConnection::release_to_pool()`. let Floating { inner: idle, guard } = floating.into_idle(); @@ -135,11 +151,11 @@ impl SharedPool { // NOTE: we need to make sure we drop the permit *after* we push to the idle queue // don't decrease the size guard.release_permit(); + + self.num_idle.fetch_add(1, Ordering::AcqRel); } /// Try to atomically increment the pool size for a new connection. - /// - /// Returns `None` if we are at max_connections or if the pool is closed. pub(super) fn try_increment_size<'a>( self: &'a Arc, permit: SemaphoreReleaser<'a>, @@ -157,16 +173,15 @@ impl SharedPool { } } - #[allow(clippy::needless_lifetimes)] pub(super) async fn acquire(self: &Arc) -> Result>, Error> { if self.is_closed() { return Err(Error::PoolClosed); } - let deadline = Instant::now() + self.options.connect_timeout; + let deadline = Instant::now() + self.options.acquire_timeout; sqlx_rt::timeout( - self.options.connect_timeout, + self.options.acquire_timeout, async { loop { let permit = self.semaphore.acquire(1).await; @@ -179,7 +194,7 @@ impl SharedPool { let guard = match self.pop_idle(permit) { // Then, check that we can use it... - Ok(conn) => match check_conn(conn, &self.options).await { + Ok(conn) => match check_idle_conn(conn, &self.options).await { // All good! Ok(live) => return Ok(live), @@ -198,7 +213,7 @@ impl SharedPool { }; // Attempt to connect... - return self.connection(deadline, guard).await; + return self.connect(deadline, guard).await; } } ) @@ -206,7 +221,7 @@ impl SharedPool { .map_err(|_| Error::PoolTimedOut)? } - pub(super) async fn connection( + pub(super) async fn connect( self: &Arc, deadline: Instant, guard: DecrementSizeGuard, @@ -226,21 +241,35 @@ impl SharedPool { match sqlx_rt::timeout(timeout, self.connect_options.connect()).await { // successfully established connection Ok(Ok(mut raw)) => { - if let Some(callback) = &self.options.after_connect { - callback(&mut raw).await?; - } + // See comment on `PoolOptions::after_connect` + let meta = PoolConnectionMetadata { + age: Duration::ZERO, + idle_for: Duration::ZERO, + }; + + let res = if let Some(callback) = &self.options.after_connect { + callback(&mut raw, meta).await + } else { + Ok(()) + }; + + match res { + Ok(()) => return Ok(Floating::new_live(raw, guard)), + Err(e) => { + log::error!("error returned from after_connect: {:?}", e); + // The connection is broken, don't try to close nicely. + let _ = raw.close_hard().await; - return Ok(Floating::new_live(raw, guard)); + // Fall through to the backoff. + } + } } // an IO error while connecting is assumed to be the system starting up Ok(Err(Error::Io(e))) if e.kind() == std::io::ErrorKind::ConnectionRefused => (), - // TODO: Handle other database "boot period"s - - // [postgres] the database system is starting up - // TODO: Make this check actually check if this is postgres - Ok(Err(Error::Database(error))) if error.code().as_deref() == Some("57P03") => (), + // We got a transient database error, retry. + Ok(Err(Error::Database(error))) if error.is_transient_in_connect_phase() => (), // Any other error while connection should immediately // terminate and bubble the error up @@ -250,42 +279,86 @@ impl SharedPool { Err(_) => return Err(Error::PoolTimedOut), } - // If the connection is refused wait in exponentially + // If the connection is refused, wait in exponentially // increasing steps for the server to come up, // capped by a factor of the remaining time until the deadline sqlx_rt::sleep(backoff).await; backoff = cmp::min(backoff * 2, max_backoff); } } -} -// NOTE: Function names here are bizarre. Helpful help would be appreciated. + /// Try to maintain `min_connections`, returning any errors (including `PoolTimedOut`). + pub async fn try_min_connections(self: &Arc, deadline: Instant) -> Result<(), Error> { + macro_rules! unwrap_or_return { + ($expr:expr) => { + match $expr { + Some(val) => val, + None => return Ok(()), + } + }; + } + + while self.size() < self.options.min_connections { + // Don't wait for a semaphore permit. + // + // If no extra permits are available then we shouldn't be trying to spin up + // connections anyway. + let permit = unwrap_or_return!(self.semaphore.try_acquire(1)); + + // We must always obey `max_connections`. + let guard = unwrap_or_return!(self.try_increment_size(permit).ok()); + + // We skip `after_release` since the connection was never provided to user code + // besides `after_connect`, if they set it. + self.release(self.connect(deadline, guard).await?); + } + + Ok(()) + } + + /// Attempt to maintain `min_connections`, logging if unable. + pub async fn min_connections_maintenance(self: &Arc, deadline: Option) { + let deadline = deadline.unwrap_or_else(|| { + // Arbitrary default deadline if the caller doesn't care. + Instant::now() + Duration::from_secs(300) + }); -fn is_beyond_lifetime(live: &Live, options: &PoolOptions) -> bool { - // check if connection was within max lifetime (or not set) + match self.try_min_connections(deadline).await { + Ok(()) => (), + Err(Error::PoolClosed) => (), + Err(Error::PoolTimedOut) => { + log::debug!("unable to complete `min_connections` maintenance before deadline") + } + Err(e) => log::debug!("error while maintaining min_connections: {:?}", e), + } + } +} + +/// Returns `true` if the connection has exceeded `options.max_lifetime` if set, `false` otherwise. +fn is_beyond_max_lifetime(live: &Live, options: &PoolOptions) -> bool { options .max_lifetime - .map_or(false, |max| live.created.elapsed() > max) + .map_or(false, |max| live.created_at.elapsed() > max) } -fn is_beyond_idle(idle: &Idle, options: &PoolOptions) -> bool { - // if connection wasn't idle too long (or not set) +/// Returns `true` if the connection has exceeded `options.idle_timeout` if set, `false` otherwise. +fn is_beyond_idle_timeout(idle: &Idle, options: &PoolOptions) -> bool { options .idle_timeout - .map_or(false, |timeout| idle.since.elapsed() > timeout) + .map_or(false, |timeout| idle.idle_since.elapsed() > timeout) } -async fn check_conn( +async fn check_idle_conn( mut conn: Floating>, options: &PoolOptions, ) -> Result>, DecrementSizeGuard> { // If the connection we pulled has expired, close the connection and // immediately create a new connection - if is_beyond_lifetime(&conn, options) { - // we're closing the connection either way - // close the connection but don't really care about the result + if is_beyond_max_lifetime(&conn, options) { return Err(conn.close().await); - } else if options.test_before_acquire { + } + + if options.test_before_acquire { // Check that the connection is still live if let Err(e) = conn.ping().await { // an error here means the other end has hung up or we lost connectivity @@ -293,18 +366,22 @@ async fn check_conn( // the error itself here isn't necessarily unexpected so WARN is too strong log::info!("ping on idle connection returned error: {}", e); // connection is broken so don't try to close nicely - return Err(conn.close().await); + return Err(conn.close_hard().await); } - } else if let Some(test) = &options.before_acquire { - match test(&mut conn.live.raw).await { + } + + if let Some(test) = &options.before_acquire { + let meta = conn.metadata(); + match test(&mut conn.live.raw, meta).await { Ok(false) => { - // connection was rejected by user-defined hook + // connection was rejected by user-defined hook, close nicely return Err(conn.close().await); } Err(error) => { - log::info!("in `before_acquire`: {}", error); - return Err(conn.close().await); + log::warn!("error from `before_acquire`: {}", error); + // connection is broken so don't try to close nicely + return Err(conn.close_hard().await); } Ok(true) => {} @@ -315,29 +392,53 @@ async fn check_conn( Ok(conn.into_live()) } -/// if `max_lifetime` or `idle_timeout` is set, spawn a task that reaps senescent connections -fn spawn_reaper(pool: &Arc>) { +fn spawn_maintenance_tasks(pool: &Arc>) { + let pool = Arc::clone(&pool); + let period = match (pool.options.max_lifetime, pool.options.idle_timeout) { (Some(it), None) | (None, Some(it)) => it, (Some(a), Some(b)) => cmp::min(a, b), - (None, None) => return, - }; + (None, None) => { + if pool.options.min_connections > 0 { + sqlx_rt::spawn(async move { + pool.min_connections_maintenance(None).await; + }); + } - let pool = Arc::clone(&pool); + return; + } + }; sqlx_rt::spawn(async move { - while !pool.is_closed() { - if !pool.idle_conns.is_empty() { - do_reap(&pool).await; - } - sqlx_rt::sleep(period).await; - } + // Immediately cancel this task if the pool is closed. + let _ = pool + .close_event() + .do_until(async { + while !pool.is_closed() { + let next_run = Instant::now() + period; + + pool.min_connections_maintenance(Some(next_run)).await; + + if let Some(duration) = next_run.checked_duration_since(Instant::now()) { + // `async-std` doesn't have a `sleep_until()` + sqlx_rt::sleep(duration).await; + } else { + sqlx_rt::yield_now().await; + } + + // Don't run the reaper right away. + if !pool.idle_conns.is_empty() { + do_reap(&pool).await; + } + } + }) + .await; }); } -async fn do_reap(pool: &Arc>) { +async fn do_reap(pool: &Arc>) { // reap at most the current size minus the minimum idle let max_reaped = pool.size().saturating_sub(pool.options.min_connections); @@ -346,7 +447,8 @@ async fn do_reap(pool: &Arc>) { // only connections waiting in the queue .filter_map(|_| pool.try_acquire()) .partition::, _>(|conn| { - is_beyond_idle(conn, &pool.options) || is_beyond_lifetime(conn, &pool.options) + is_beyond_idle_timeout(conn, &pool.options) + || is_beyond_max_lifetime(conn, &pool.options) }); for conn in keep { @@ -364,48 +466,43 @@ async fn do_reap(pool: &Arc>) { /// Will decrement the pool size if dropped, to avoid semantically "leaking" connections /// (where the pool thinks it has more connections than it does). pub(in crate::pool) struct DecrementSizeGuard { - pub(crate) pool: Arc>, - dropped: bool, + pub(crate) pool: Arc>, + cancelled: bool, } impl DecrementSizeGuard { /// Create a new guard that will release a semaphore permit on-drop. - pub fn new_permit(pool: Arc>) -> Self { + pub fn new_permit(pool: Arc>) -> Self { Self { pool, - dropped: false, + cancelled: false, } } - pub fn from_permit(pool: Arc>, mut permit: SemaphoreReleaser<'_>) -> Self { + pub fn from_permit(pool: Arc>, mut permit: SemaphoreReleaser<'_>) -> Self { // here we effectively take ownership of the permit permit.disarm(); Self::new_permit(pool) } - /// Return `true` if the internal references point to the same fields in `SharedPool`. - pub fn same_pool(&self, pool: &SharedPool) -> bool { - ptr::eq(&*self.pool, pool) - } - /// Release the semaphore permit without decreasing the pool size. fn release_permit(self) { self.pool.semaphore.release(1); self.cancel(); } - pub fn cancel(self) { - mem::forget(self); + pub fn cancel(mut self) { + self.cancelled = true; } } impl Drop for DecrementSizeGuard { fn drop(&mut self) { - assert!(!self.dropped, "double-dropped!"); - self.dropped = true; - self.pool.size.fetch_sub(1, Ordering::SeqCst); + if !self.cancelled { + self.pool.size.fetch_sub(1, Ordering::AcqRel); - // and here we release the permit we got on construction - self.pool.semaphore.release(1); + // and here we release the permit we got on construction + self.pool.semaphore.release(1); + } } } diff --git a/sqlx-core/src/pool/mod.rs b/sqlx-core/src/pool/mod.rs index 6ca6739144..28a9e1217a 100644 --- a/sqlx-core/src/pool/mod.rs +++ b/sqlx-core/src/pool/mod.rs @@ -54,7 +54,7 @@ //! [`Pool::acquire`] or //! [`Pool::begin`]. -use self::inner::SharedPool; +use self::inner::PoolInner; #[cfg(all( any( feature = "postgres", @@ -91,7 +91,7 @@ mod options; pub use self::connection::PoolConnection; pub(crate) use self::maybe::MaybePoolConnection; -pub use self::options::PoolOptions; +pub use self::options::{PoolConnectionMetadata, PoolOptions}; /// An asynchronous pool of SQLx database connections. /// @@ -176,14 +176,15 @@ pub use self::options::PoolOptions; /// does not _automatically_ save you from the overhead of creating a new connection. /// /// However, because this pool by design enforces _reuse_ of connections, this overhead cost -/// is not paid each and every time you need a connection. In fact you set the `min_connections` -/// option in [PoolOptions], the pool will create that many connections up-front so that they are -/// ready to go when a request comes in. +/// is not paid each and every time you need a connection. In fact, if you set +/// [the `min_connections` option in PoolOptions][PoolOptions::min_connections], the pool will +/// create that many connections up-front so that they are ready to go when a request comes in, +/// and maintain that number on a best-effort basis for consistent performance. /// /// ##### 2. Connection Limits (MySQL, MSSQL, Postgres) -/// Database servers usually place hard limits on the number of connections that it allows open at +/// Database servers usually place hard limits on the number of connections that are allowed open at /// any given time, to maintain performance targets and prevent excessive allocation of resources, -/// namely RAM. +/// such as RAM, journal files, disk caches, etc. /// /// These limits have different defaults per database flavor, and may vary between different /// distributions of the same database, but are typically configurable on server start; @@ -240,7 +241,7 @@ pub use self::options::PoolOptions; /// /// Depending on the database server, a connection will have caches for all kinds of other data as /// well and queries will generally benefit from these caches being "warm" (populated with data). -pub struct Pool(pub(crate) Arc>); +pub struct Pool(pub(crate) Arc>); /// A future that resolves when the pool is closed. /// @@ -250,58 +251,112 @@ pub struct CloseEvent { } impl Pool { - /// Creates a new connection pool with a default pool configuration and - /// the given connection URI; and, immediately establishes one connection. - pub async fn connect(uri: &str) -> Result { - PoolOptions::::new().connect(uri).await + /// Create a new connection pool with a default pool configuration and + /// the given connection URL, and immediately establish one connection. + /// + /// Refer to the relevant `ConnectOptions` impl for your database for the expected URL format: + /// + /// * Postgres: [`PgConnectOptions`][crate::postgres::PgConnectOptions] + /// * MySQL: [`MySqlConnectOptions`][crate::mysql::MySqlConnectOptions] + /// * SQLite: [`SqliteConnectOptions`][crate::sqlite::SqliteConnectOptions] + /// * MSSQL: [`MssqlConnectOptions`][crate::mssql::MssqlConnectOptions] + /// + /// The default configuration is mainly suited for testing and light-duty applications. + /// For production applications, you'll likely want to make at least few tweaks. + /// + /// See [`PoolOptions::new()`] for details. + pub async fn connect(url: &str) -> Result { + PoolOptions::::new().connect(url).await } - /// Creates a new connection pool with a default pool configuration and - /// the given connection options; and, immediately establishes one connection. + /// Create a new connection pool with a default pool configuration and + /// the given `ConnectOptions`, and immediately establish one connection. + /// + /// The default configuration is mainly suited for testing and light-duty applications. + /// For production applications, you'll likely want to make at least few tweaks. + /// + /// See [`PoolOptions::new()`] for details. pub async fn connect_with( options: ::Options, ) -> Result { PoolOptions::::new().connect_with(options).await } - /// Creates a new connection pool with a default pool configuration and - /// the given connection URI; and, will establish a connections as the pool - /// starts to be used. - pub fn connect_lazy(uri: &str) -> Result { - PoolOptions::::new().connect_lazy(uri) + /// Create a new connection pool with a default pool configuration and + /// the given connection URL. + /// + /// The pool will establish connections only as needed. + /// + /// Refer to the relevant [`ConnectOptions`] impl for your database for the expected URL format: + /// + /// * Postgres: [`PgConnectOptions`][crate::postgres::PgConnectOptions] + /// * MySQL: [`MySqlConnectOptions`][crate::mysql::MySqlConnectOptions] + /// * SQLite: [`SqliteConnectOptions`][crate::sqlite::SqliteConnectOptions] + /// * MSSQL: [`MssqlConnectOptions`][crate::mssql::MssqlConnectOptions] + /// + /// The default configuration is mainly suited for testing and light-duty applications. + /// For production applications, you'll likely want to make at least few tweaks. + /// + /// See [`PoolOptions::new()`] for details. + pub fn connect_lazy(url: &str) -> Result { + PoolOptions::::new().connect_lazy(url) } - /// Creates a new connection pool with a default pool configuration and - /// the given connection options; and, will establish a connections as the pool - /// starts to be used. + /// Create a new connection pool with a default pool configuration and + /// the given `ConnectOptions`. + /// + /// The pool will establish connections only as needed. + /// + /// The default configuration is mainly suited for testing and light-duty applications. + /// For production applications, you'll likely want to make at least few tweaks. + /// + /// See [`PoolOptions::new()`] for details. pub fn connect_lazy_with(options: ::Options) -> Self { PoolOptions::::new().connect_lazy_with(options) } /// Retrieves a connection from the pool. /// - /// Waits for at most the configured connection timeout before returning an error. + /// The total time this method is allowed to execute is capped by + /// [`PoolOptions::acquire_timeout`]. + /// If that timeout elapses, this will return [`Error::PoolClosed`]. + /// + /// ### Note: Cancellation/Timeout May Drop Connections + /// If `acquire` is cancelled or times out after it acquires a connection from the idle queue or + /// opens a new one, it will drop that connection because we don't want to assume it + /// is safe to return to the pool, and testing it to see if it's safe to release could introduce + /// subtle bugs if not implemented correctly. To avoid that entirely, we've decided to not + /// gracefully handle cancellation here. + /// + /// However, if your workload is sensitive to dropped connections such as using an in-memory + /// SQLite database with a pool size of 1, you can pretty easily ensure that a cancelled + /// `acquire()` call will never drop connections by tweaking your [`PoolOptions`]: + /// + /// * Set [`test_before_acquire(false)`][PoolOptions::test_before_acquire] + /// * Never set [`before_acquire`][PoolOptions::before_acquire] or + /// [`after_connect`][PoolOptions::after_connect]. + /// + /// This should eliminate any potential `.await` points between acquiring a connection and + /// returning it. pub fn acquire(&self) -> impl Future, Error>> + 'static { let shared = self.0.clone(); - async move { shared.acquire().await.map(|conn| conn.attach(&shared)) } + async move { shared.acquire().await.map(|conn| conn.reattach()) } } /// Attempts to retrieve a connection from the pool if there is one available. /// - /// Returns `None` immediately if there are no idle connections available in the pool. + /// Returns `None` immediately if there are no idle connections available in the pool + /// or there are tasks waiting for a connection which have yet to wake. pub fn try_acquire(&self) -> Option> { - self.0 - .try_acquire() - .map(|conn| conn.into_live().attach(&self.0)) + self.0.try_acquire().map(|conn| conn.into_live().reattach()) } - /// Retrieves a new connection and immediately begins a new transaction. + /// Retrieves a connection and immediately begins a new transaction. pub async fn begin(&self) -> Result, Error> { Ok(Transaction::begin(MaybePoolConnection::PoolConnection(self.acquire().await?)).await?) } - /// Attempts to retrieve a new connection and immediately begins a new transaction if there - /// is one available. + /// Attempts to retrieve a connection and immediately begins a new transaction if successful. pub async fn try_begin(&self) -> Result>, Error> { match self.try_acquire() { Some(conn) => Transaction::begin(MaybePoolConnection::PoolConnection(conn)) @@ -312,31 +367,29 @@ impl Pool { } } - /// Shut down the connection pool, waiting for all connections to be gracefully closed. - /// - /// Upon `.await`ing this call, any currently waiting or subsequent calls to [Pool::acquire] and - /// the like will immediately return [Error::PoolClosed] and no new connections will be opened. + /// Shut down the connection pool, immediately waking all tasks waiting for a connection. /// - /// Any connections currently idle in the pool will be immediately closed, including sending - /// a graceful shutdown message to the database server, if applicable. + /// Upon calling this method, any currently waiting or subsequent calls to [`Pool::acquire`] and + /// the like will immediately return [`Error::PoolClosed`] and no new connections will be opened. + /// Checked-out connections are unaffected, but will be gracefully closed on-drop + /// rather than being returned to the pool. /// - /// Checked-out connections are unaffected, but will be closed in the same manner when they are - /// returned to the pool. + /// Returns a `Future` which can be `.await`ed to ensure all connections are + /// gracefully closed. It will first close any idle connections currently waiting in the pool, + /// then wait for all checked-out connections to be returned or closed. /// - /// Does not resolve until all connections are returned to the pool and gracefully closed. + /// Waiting for connections to be gracefully closed is optional, but will allow the database + /// server to clean up the resources sooner rather than later. This is especially important + /// for tests that create a new pool every time, otherwise you may see errors about connection + /// limits being exhausted even when running tests in a single thread. /// - /// ### Note: `async fn` - /// Because this is an `async fn`, the pool will *not* be marked as closed unless the - /// returned future is polled at least once. + /// If the returned `Future` is not run to completion, any remaining connections will be dropped + /// when the last handle for the given pool instance is dropped, which could happen in a task + /// spawned by `Pool` internally and so may be unpredictable otherwise. /// - /// If you want to close the pool but don't want to wait for all connections to be gracefully - /// closed, you can do `pool.close().now_or_never()`, which polls the future exactly once - /// with a no-op waker. - // TODO: I don't want to change the signature right now in case it turns out to be a - // breaking change, but this probably should eagerly mark the pool as closed and then the - // returned future only needs to be awaited to gracefully close the connections. - pub async fn close(&self) { - self.0.close().await; + /// `.close()` may be safely called and `.await`ed on multiple handles concurrently. + pub fn close(&self) -> impl Future + '_ { + self.0.close() } /// Returns `true` if [`.close()`][Pool::close] has been called on the pool, `false` otherwise. @@ -417,9 +470,7 @@ impl Pool { /// # } /// ``` pub fn close_event(&self) -> CloseEvent { - CloseEvent { - listener: (!self.is_closed()).then(|| self.0.on_closed.listen()), - } + self.0.close_event() } /// Returns the number of connections currently active. This includes idle connections. @@ -429,9 +480,11 @@ impl Pool { /// Returns the number of connections active and idle (not in use). /// - /// This will block until the number of connections stops changing for at - /// least 2 atomic accesses in a row. If the number of idle connections is - /// changing rapidly, this may run indefinitely. + /// As of 0.6.0, this has been fixed to use a separate atomic counter and so should be fine to + /// call even at high load. + /// + /// This previously called [`crossbeam::queue::ArrayQueue::len()`] which waits for the head and + /// tail pointers to be in a consistent state, which may never happen at high levels of churn. pub fn num_idle(&self) -> usize { self.0.num_idle() } @@ -449,7 +502,7 @@ impl Pool { impl Pool { /// Returns the database driver currently in-use by this `Pool`. /// - /// Determined by the connection URI. + /// Determined by the connection URL. pub fn any_kind(&self) -> AnyKind { self.0.connect_options.kind() } diff --git a/sqlx-core/src/pool/options.rs b/sqlx-core/src/pool/options.rs index 3be566d793..3e951cb292 100644 --- a/sqlx-core/src/pool/options.rs +++ b/sqlx-core/src/pool/options.rs @@ -1,40 +1,101 @@ use crate::connection::Connection; use crate::database::Database; use crate::error::Error; -use crate::pool::inner::SharedPool; +use crate::pool::inner::PoolInner; use crate::pool::Pool; use futures_core::future::BoxFuture; -use sqlx_rt::spawn; -use std::cmp; use std::fmt::{self, Debug, Formatter}; -use std::sync::Arc; use std::time::{Duration, Instant}; +/// Configuration options for [`Pool`][super::Pool]. +/// +/// ### Callback Functions: Why Do I Need `Box::pin()`? +/// Essentially, because it's impossible to write generic bounds that describe a closure +/// with a higher-ranked lifetime parameter, returning a future with that same lifetime. +/// +/// Ideally, you could define it like this: +/// ```rust,ignore +/// async fn takes_foo_callback(f: impl for<'a> Fn(&'a mut Foo) -> impl Future<'a, Output = ()>) +/// ``` +/// +/// However, the compiler does not allow using `impl Trait` in the return type of an `impl Fn`. +/// +/// And if you try to do it like this: +/// ```rust,ignore +/// async fn takes_foo_callback(f: F) +/// where +/// F: for<'a> Fn(&'a mut Foo) -> Fut, +/// Fut: for<'a> Future + 'a +/// ``` +/// +/// There's no way to tell the compiler that those two `'a`s should be the same lifetime. +/// +/// It's possible to make this work with a custom trait, but it's fiddly and requires naming +/// the type of the closure parameter. +/// +/// Having the closure return `BoxFuture` allows us to work around this, as all the type information +/// fits into a single generic parameter. +/// +/// We still need to `Box` the future internally to give it a concrete type to avoid leaking a type +/// parameter everywhere, and `Box` is in the prelude so it doesn't need to be manually imported, +/// so having the closure return `Pin` directly is the path of least resistance from +/// the perspectives of both API designer and consumer. pub struct PoolOptions { pub(crate) test_before_acquire: bool, pub(crate) after_connect: Option< Box< - dyn Fn(&mut DB::Connection) -> BoxFuture<'_, Result<(), Error>> + 'static + Send + Sync, + dyn Fn(&mut DB::Connection, PoolConnectionMetadata) -> BoxFuture<'_, Result<(), Error>> + + 'static + + Send + + Sync, >, >, pub(crate) before_acquire: Option< Box< - dyn Fn(&mut DB::Connection) -> BoxFuture<'_, Result> + dyn Fn( + &mut DB::Connection, + PoolConnectionMetadata, + ) -> BoxFuture<'_, Result> + + 'static + + Send + + Sync, + >, + >, + pub(crate) after_release: Option< + Box< + dyn Fn( + &mut DB::Connection, + PoolConnectionMetadata, + ) -> BoxFuture<'_, Result> + 'static + Send + Sync, >, >, - pub(crate) after_release: - Option bool + 'static + Send + Sync>>, pub(crate) max_connections: u32, - pub(crate) connect_timeout: Duration, + pub(crate) acquire_timeout: Duration, pub(crate) min_connections: u32, pub(crate) max_lifetime: Option, pub(crate) idle_timeout: Option, pub(crate) fair: bool, } +/// Metadata for the connection being processed by a [`PoolOptions`] callback. +#[derive(Debug)] // Don't want to commit to any other trait impls yet. +#[non_exhaustive] // So we can safely add fields in the future. +pub struct PoolConnectionMetadata { + /// The duration since the connection was first opened. + /// + /// For [`after_connect`][PoolOptions::after_connect], this is [`Duration::ZERO`]. + pub age: Duration, + + /// The duration that the connection spent in the idle queue. + /// + /// Only relevant for [`before_acquire`][PoolOptions::before_acquire]. + /// For other callbacks, this is [`Duration::ZERO`]. + pub idle_for: Duration, +} + impl Default for PoolOptions { fn default() -> Self { Self::new() @@ -42,15 +103,23 @@ impl Default for PoolOptions { } impl PoolOptions { + /// Returns a default "sane" configuration, suitable for testing or light-duty applications. + /// + /// Production applications will likely want to at least modify + /// [`max_connections`][Self::max_connections]. + /// + /// See the source of this method for the current default values. pub fn new() -> Self { Self { + // User-specifiable routines after_connect: None, - test_before_acquire: true, before_acquire: None, after_release: None, + test_before_acquire: true, + // A production application will want to set a higher limit than this. max_connections: 10, min_connections: 0, - connect_timeout: Duration::from_secs(30), + acquire_timeout: Duration::from_secs(30), idle_timeout: Some(Duration::from_secs(10 * 60)), max_lifetime: Some(Duration::from_secs(30 * 60)), fair: true, @@ -58,33 +127,60 @@ impl PoolOptions { } /// Set the maximum number of connections that this pool should maintain. + /// + /// Be mindful of the connection limits for your database as well as other applications + /// which may want to connect to the same database (or even multiple instances of the same + /// application in high-availability deployments). pub fn max_connections(mut self, max: u32) -> Self { self.max_connections = max; self } - /// Set the amount of time to attempt connecting to the database. - /// - /// If this timeout elapses, [`Pool::acquire`] will return an error. - pub fn connect_timeout(mut self, timeout: Duration) -> Self { - self.connect_timeout = timeout; - self - } - /// Set the minimum number of connections to maintain at all times. /// /// When the pool is built, this many connections will be automatically spun up. /// - /// If any connection is reaped by [`max_lifetime`] or [`idle_timeout`] and it brings - /// the connection count below this amount, a new connection will be opened to replace it. + /// If any connection is reaped by [`max_lifetime`] or [`idle_timeout`], or explicitly closed, + /// and it brings the connection count below this amount, a new connection will be opened to + /// replace it. + /// + /// This is only done on a best-effort basis, however. The routine that maintains this value + /// has a deadline so it doesn't wait forever if the database is being slow or returning errors. + /// + /// This value is clamped internally to not exceed [`max_connections`]. + /// + /// We've chosen not to assert `min_connections <= max_connections` anywhere + /// because it shouldn't break anything internally if the condition doesn't hold, + /// and if the application allows either value to be dynamically set + /// then it should be checking this condition itself and returning + /// a nicer error than a panic anyway. /// /// [`max_lifetime`]: Self::max_lifetime /// [`idle_timeout`]: Self::idle_timeout + /// [`max_connections`]: Self::max_connections pub fn min_connections(mut self, min: u32) -> Self { self.min_connections = min; self } + /// Set the maximum amount of time to spend waiting for a connection in [`Pool::acquire()`]. + /// + /// Caps the total amount of time `Pool::acquire()` can spend waiting across multiple phases: + /// + /// * First, it may need to wait for a permit from the semaphore, which grants it the privilege + /// of opening a connection or popping one from the idle queue. + /// * If an existing idle connection is acquired, by default it will be checked for liveness + /// and integrity before being returned, which may require executing a command on the + /// connection. This can be disabled with [`test_before_acquire(false)`][Self::test_before_acquire]. + /// * If [`before_acquire`][Self::before_acquire] is set, that will also be executed. + /// * If a new connection needs to be opened, that will obviously require I/O, handshaking, + /// and initialization commands. + /// * If [`after_connect`][Self::after_connect] is set, that will also be executed. + pub fn acquire_timeout(mut self, timeout: Duration) -> Self { + self.acquire_timeout = timeout; + self + } + /// Set the maximum lifetime of individual connections. /// /// Any connection with a lifetime greater than this will be closed. @@ -106,7 +202,7 @@ impl PoolOptions { /// Set a maximum idle duration for individual connections. /// - /// Any connection with an idle duration longer than this will be closed. + /// Any connection that remains in the idle queue longer than this will be closed. /// /// For usage-based database server billing, this can be a cost saver. pub fn idle_timeout(mut self, timeout: impl Into>) -> Self { @@ -141,38 +237,102 @@ impl PoolOptions { self } - /// Perform an action after connecting to the database. + /// Perform an asynchronous action after connecting to the database. /// - /// # Example + /// If the operation returns with an error then the error is logged, the connection is closed + /// and a new one is opened in its place and the callback is invoked again. + /// + /// This occurs in a backoff loop to avoid high CPU usage and spamming logs during a transient + /// error condition. + /// + /// Note that this may be called for internally opened connections, such as when maintaining + /// [`min_connections`][Self::min_connections], that are then immediately returned to the pool + /// without invoking [`after_release`][Self::after_release]. + /// + /// # Example: Additional Parameters + /// This callback may be used to set additional configuration parameters + /// that are not exposed by the database's `ConnectOptions`. + /// + /// This example is written for PostgreSQL but can likely be adapted to other databases. /// /// ```no_run /// # async fn f() -> Result<(), Box> { - /// use sqlx_core::executor::Executor; - /// use sqlx_core::postgres::PgPoolOptions; - /// // PostgreSQL + /// use sqlx::Executor; + /// use sqlx::postgres::PgPoolOptions; + /// /// let pool = PgPoolOptions::new() - /// .after_connect(|conn| Box::pin(async move { - /// conn.execute("SET application_name = 'your_app';").await?; - /// conn.execute("SET search_path = 'my_schema';").await?; + /// .after_connect(|conn, _meta| Box::pin(async move { + /// // When directly invoking `Executor` methods, + /// // it is possible to execute multiple statements with one call. + /// conn.execute("SET application_name = 'your_app'; SET search_path = 'my_schema';") + /// .await?; /// - /// Ok(()) - /// })) + /// Ok(()) + /// })) /// .connect("postgres:// …").await?; /// # Ok(()) /// # } /// ``` + /// + /// For a discussion on why `Box::pin()` is required, see [the type-level docs][Self]. pub fn after_connect(mut self, callback: F) -> Self where - for<'c> F: - Fn(&'c mut DB::Connection) -> BoxFuture<'c, Result<(), Error>> + 'static + Send + Sync, + // We're passing the `PoolConnectionMetadata` here mostly for future-proofing. + // `age` and `idle_for` are obviously not useful for fresh connections. + for<'c> F: Fn(&'c mut DB::Connection, PoolConnectionMetadata) -> BoxFuture<'c, Result<(), Error>> + + 'static + + Send + + Sync, { self.after_connect = Some(Box::new(callback)); self } + /// Perform an asynchronous action on a previously idle connection before giving it out. + /// + /// Alongside the connection, the closure gets [`PoolConnectionMetadata`] which contains + /// potentially useful information such as the connection's age and the duration it was + /// idle. + /// + /// If the operation returns `Ok(true)`, the connection is returned to the task that called + /// [`Pool::acquire`]. + /// + /// If the operation returns `Ok(false)` or an error, the error is logged (if applicable) + /// and then the connection is closed and [`Pool::acquire`] tries again with another idle + /// connection. If it runs out of idle connections, it opens a new connection instead. + /// + /// This is *not* invoked for new connections. Use [`after_connect`][Self::after_connect] + /// for those. + /// + /// # Example: Custom `test_before_acquire` Logic + /// If you only want to ping connections if they've been idle a certain amount of time, + /// you can implement your own logic here: + /// + /// This example is written for Postgres but should be trivially adaptable to other databases. + /// ```no_run + /// # async fn f() -> Result<(), Box> { + /// use sqlx::{Connection, Executor}; + /// use sqlx::postgres::PgPoolOptions; + /// + /// let pool = PgPoolOptions::new() + /// .test_before_acquire(false) + /// .before_acquire(|conn, meta| Box::pin(async move { + /// // One minute + /// if meta.idle_for.as_secs() > 60 { + /// conn.ping().await?; + /// } + /// + /// Ok(true) + /// })) + /// .connect("postgres:// …").await?; + /// # Ok(()) + /// # } + ///``` + /// + /// For a discussion on why `Box::pin()` is required, see [the type-level docs][Self]. pub fn before_acquire(mut self, callback: F) -> Self where - for<'c> F: Fn(&'c mut DB::Connection) -> BoxFuture<'c, Result> + for<'c> F: Fn(&'c mut DB::Connection, PoolConnectionMetadata) -> BoxFuture<'c, Result> + 'static + Send + Sync, @@ -181,67 +341,130 @@ impl PoolOptions { self } + /// Perform an asynchronous action on a connection before it is returned to the pool. + /// + /// Alongside the connection, the closure gets [`PoolConnectionMetadata`] which contains + /// potentially useful information such as the connection's age. + /// + /// If the operation returns `Ok(true)`, the connection is returned to the pool's idle queue. + /// If the operation returns `Ok(false)` or an error, the error is logged (if applicable) + /// and the connection is closed, allowing a task waiting on [`Pool::acquire`] to + /// open a new one in its place. + /// + /// # Example (Postgres): Close Memory-Hungry Connections + /// Instead of relying on [`max_lifetime`][Self::max_lifetime] to close connections, + /// we can monitor their memory usage directly and close any that have allocated too much. + /// + /// Note that this is purely an example showcasing a possible use for this callback + /// and may be flawed as it has not been tested. + /// + /// This example queries [`pg_backend_memory_contexts`](https://www.postgresql.org/docs/current/view-pg-backend-memory-contexts.html) + /// which is only allowed for superusers. + /// + /// ```no_run + /// # async fn f() -> Result<(), Box> { + /// use sqlx::{Connection, Executor}; + /// use sqlx::postgres::PgPoolOptions; + /// + /// let pool = PgPoolOptions::new() + /// // Let connections live as long as they want. + /// .max_lifetime(None) + /// .after_release(|conn, meta| Box::pin(async move { + /// // Only check connections older than 6 hours. + /// if meta.age.as_secs() < 6 * 60 * 60 { + /// return Ok(true); + /// } + /// + /// let total_memory_usage: i64 = sqlx::query_scalar( + /// "select sum(used_bytes) from pg_backend_memory_contexts" + /// ) + /// .fetch_one(conn) + /// .await?; + /// + /// // Close the connection if the backend memory usage exceeds 256 MiB. + /// Ok(total_memory_usage <= (2 << 28)) + /// })) + /// .connect("postgres:// …").await?; + /// # Ok(()) + /// # } pub fn after_release(mut self, callback: F) -> Self where - F: Fn(&mut DB::Connection) -> bool + 'static + Send + Sync, + for<'c> F: Fn(&'c mut DB::Connection, PoolConnectionMetadata) -> BoxFuture<'c, Result> + + 'static + + Send + + Sync, { self.after_release = Some(Box::new(callback)); self } - /// Creates a new pool from this configuration and immediately establishes one connection. - pub async fn connect(self, uri: &str) -> Result, Error> { - self.connect_with(uri.parse()?).await + /// Create a new pool from this `PoolOptions` and immediately open at least one connection. + /// + /// This ensures the configuration is correct. + /// + /// The total number of connections opened is min(1, [min_connections][Self::min_connections]). + /// + /// Refer to the relevant `ConnectOptions` impl for your database for the expected URL format: + /// + /// * Postgres: [`PgConnectOptions`][crate::postgres::PgConnectOptions] + /// * MySQL: [`MySqlConnectOptions`][crate::mysql::MySqlConnectOptions] + /// * SQLite: [`SqliteConnectOptions`][crate::sqlite::SqliteConnectOptions] + /// * MSSQL: [`MssqlConnectOptions`][crate::mssql::MssqlConnectOptions] + pub async fn connect(self, url: &str) -> Result, Error> { + self.connect_with(url.parse()?).await } - /// Creates a new pool from this configuration and immediately establishes one connection. + /// Create a new pool from this `PoolOptions` and immediately open at least one connection. + /// + /// This ensures the configuration is correct. + /// + /// The total number of connections opened is min(1, [min_connections][Self::min_connections]). pub async fn connect_with( self, options: ::Options, ) -> Result, Error> { - let shared = SharedPool::new_arc(self, options); - - init_min_connections(&shared).await?; - - Ok(Pool(shared)) - } + // Don't take longer than `acquire_timeout` starting from when this is called. + let deadline = Instant::now() + self.acquire_timeout; - /// Creates a new pool from this configuration and will establish a connections as the pool - /// starts to be used. - pub fn connect_lazy(self, uri: &str) -> Result, Error> { - Ok(self.connect_lazy_with(uri.parse()?)) - } + let inner = PoolInner::new_arc(self, options); - /// Creates a new pool from this configuration and will establish a connections as the pool - /// starts to be used. - pub fn connect_lazy_with(self, options: ::Options) -> Pool { - let shared = SharedPool::new_arc(self, options); + if inner.options.min_connections > 0 { + // If the idle reaper is spawned then this will race with the call from that task + // and may not report any connection errors. + inner.try_min_connections(deadline).await?; + } - let _ = spawn({ - let shared = Arc::clone(&shared); - async move { - let _ = init_min_connections(&shared).await; - } - }); + // If `min_connections` is nonzero then we'll likely just pull a connection + // from the idle queue here, but it should at least get tested first. + let conn = inner.acquire().await?; + inner.release(conn); - Pool(shared) + Ok(Pool(inner)) } -} - -async fn init_min_connections(pool: &Arc>) -> Result<(), Error> { - for _ in 0..cmp::max(pool.options.min_connections, 1) { - let deadline = Instant::now() + pool.options.connect_timeout; - let permit = pool.semaphore.acquire(1).await; - // this guard will prevent us from exceeding `max_size` - if let Ok(guard) = pool.try_increment_size(permit) { - // [connect] will raise an error when past deadline - let conn = pool.connection(deadline, guard).await?; - pool.release(conn); - } + /// Create a new pool from this `PoolOptions`, but don't open any connections right now. + /// + /// If [`min_connections`][Self::min_connections] is set, a background task will be spawned to + /// optimistically establish that many connections for the pool. + /// + /// Refer to the relevant `ConnectOptions` impl for your database for the expected URL format: + /// + /// * Postgres: [`PgConnectOptions`][crate::postgres::PgConnectOptions] + /// * MySQL: [`MySqlConnectOptions`][crate::mysql::MySqlConnectOptions] + /// * SQLite: [`SqliteConnectOptions`][crate::sqlite::SqliteConnectOptions] + /// * MSSQL: [`MssqlConnectOptions`][crate::mssql::MssqlConnectOptions] + pub fn connect_lazy(self, url: &str) -> Result, Error> { + Ok(self.connect_lazy_with(url.parse()?)) } - Ok(()) + /// Create a new pool from this `PoolOptions`, but don't open any connections right now. + /// + /// If [`min_connections`][Self::min_connections] is set, a background task will be spawned to + /// optimistically establish that many connections for the pool. + pub fn connect_lazy_with(self, options: ::Options) -> Pool { + // `min_connections` is guaranteed by the idle reaper now. + Pool(PoolInner::new_arc(self, options)) + } } impl Debug for PoolOptions { @@ -249,7 +472,7 @@ impl Debug for PoolOptions { f.debug_struct("PoolOptions") .field("max_connections", &self.max_connections) .field("min_connections", &self.min_connections) - .field("connect_timeout", &self.connect_timeout) + .field("connect_timeout", &self.acquire_timeout) .field("max_lifetime", &self.max_lifetime) .field("idle_timeout", &self.idle_timeout) .field("test_before_acquire", &self.test_before_acquire) diff --git a/sqlx-core/src/postgres/connection/mod.rs b/sqlx-core/src/postgres/connection/mod.rs index ba25ddd9b7..325b565c3b 100644 --- a/sqlx-core/src/postgres/connection/mod.rs +++ b/sqlx-core/src/postgres/connection/mod.rs @@ -3,12 +3,11 @@ use std::sync::Arc; use crate::HashMap; use futures_core::future::BoxFuture; -use futures_util::{FutureExt, TryFutureExt}; +use futures_util::FutureExt; use crate::common::StatementCache; use crate::connection::{Connection, LogSettings}; use crate::error::Error; -use crate::executor::Executor; use crate::ext::ustr::UStr; use crate::io::Decode; use crate::postgres::message::{ @@ -143,9 +142,24 @@ impl Connection for PgConnection { }) } + fn close_hard(mut self) -> BoxFuture<'static, Result<(), Error>> { + Box::pin(async move { + self.stream.shutdown().await?; + + Ok(()) + }) + } + fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> { + // Users were complaining about this showing up in query statistics on the server. // By sending a comment we avoid an error if the connection was in the middle of a rowset - self.execute("/* SQLx ping */").map_ok(|_| ()).boxed() + // self.execute("/* SQLx ping */").map_ok(|_| ()).boxed() + + Box::pin(async move { + // The simplest call-and-response that's possible. + self.write_sync(); + self.wait_until_ready().await + }) } fn begin(&mut self) -> BoxFuture<'_, Result, Error>> diff --git a/sqlx-core/src/postgres/error.rs b/sqlx-core/src/postgres/error.rs index 4c528b78fb..bffccba3a6 100644 --- a/sqlx-core/src/postgres/error.rs +++ b/sqlx-core/src/postgres/error.rs @@ -185,6 +185,20 @@ impl DatabaseError for PgDatabaseError { self } + fn is_transient_in_connect_phase(&self) -> bool { + // https://www.postgresql.org/docs/current/errcodes-appendix.html + [ + // too_many_connections + // This may be returned if we just un-gracefully closed a connection, + // give the database a chance to notice it and clean it up. + "53300", + // cannot_connect_now + // Returned if the database is still starting up. + "57P03", + ] + .contains(&self.code()) + } + fn constraint(&self) -> Option<&str> { self.constraint() } diff --git a/sqlx-core/src/sqlite/connection/mod.rs b/sqlx-core/src/sqlite/connection/mod.rs index 14234d92bd..e75d0d570c 100644 --- a/sqlx-core/src/sqlite/connection/mod.rs +++ b/sqlx-core/src/sqlite/connection/mod.rs @@ -155,6 +155,13 @@ impl Connection for SqliteConnection { }) } + fn close_hard(self) -> BoxFuture<'static, Result<(), Error>> { + Box::pin(async move { + drop(self); + Ok(()) + }) + } + /// Ensure the background worker thread is alive and accepting commands. fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> { Box::pin(self.worker.ping()) diff --git a/tests/any/pool.rs b/tests/any/pool.rs index 91b97978bf..df4c4041e2 100644 --- a/tests/any/pool.rs +++ b/tests/any/pool.rs @@ -12,7 +12,7 @@ async fn pool_should_invoke_after_connect() -> anyhow::Result<()> { let pool = AnyPoolOptions::new() .after_connect({ let counter = counter.clone(); - move |_conn| { + move |_conn, _meta| { let counter = counter.clone(); Box::pin(async move { counter.fetch_add(1, Ordering::SeqCst); @@ -41,7 +41,7 @@ async fn pool_should_invoke_after_connect() -> anyhow::Result<()> { async fn pool_should_be_returned_failed_transactions() -> anyhow::Result<()> { let pool = AnyPoolOptions::new() .max_connections(2) - .connect_timeout(Duration::from_secs(3)) + .acquire_timeout(Duration::from_secs(3)) .connect(&dotenv::var("DATABASE_URL")?) .await?; diff --git a/tests/postgres/postgres.rs b/tests/postgres/postgres.rs index 3f30070c8f..2f6d0e0f9c 100644 --- a/tests/postgres/postgres.rs +++ b/tests/postgres/postgres.rs @@ -45,6 +45,39 @@ async fn it_pings() -> anyhow::Result<()> { Ok(()) } +#[sqlx_macros::test] +async fn it_pings_after_suspended_query() -> anyhow::Result<()> { + let mut conn = new::().await?; + + conn.execute("create temporary table processed_row(val int4 primary key)") + .await?; + + // This query wants to return 50 rows but we only read the first one. + // This will return a `SuspendedPortal` that the driver currently ignores. + let _: i32 = sqlx::query_scalar( + r#" + insert into processed_row(val) + select * from generate_series(1, 50) + returning val + "#, + ) + .fetch_one(&mut conn) + .await?; + + // `Sync` closes the current autocommit transaction which presumably includes closing any + // suspended portals. + conn.ping().await?; + + // Make sure that all the values got inserted even though we only read the first one back. + let count: i64 = sqlx::query_scalar("select count(*) from processed_row") + .fetch_one(&mut conn) + .await?; + + assert_eq!(count, 50); + + Ok(()) +} + #[sqlx_macros::test] async fn it_maths() -> anyhow::Result<()> { let mut conn = new::().await?; @@ -511,7 +544,7 @@ async fn pool_smoke_test() -> anyhow::Result<()> { eprintln!("starting pool"); let pool = PgPoolOptions::new() - .connect_timeout(Duration::from_secs(5)) + .acquire_timeout(Duration::from_secs(5)) .min_connections(1) .max_connections(1) .connect(&dotenv::var("DATABASE_URL")?) @@ -1306,6 +1339,7 @@ VALUES let mut conn = new::().await?; #[derive(Debug, sqlx::FromRow)] + #[allow(dead_code)] // We don't actually read these fields. struct Row { count: i64, items: Vec<(i32, String, RepoMemberArray)>,