Skip to content

Commit

Permalink
fix(client): retry when checking out a closed connection
Browse files Browse the repository at this point in the history
  • Loading branch information
nox committed Jun 24, 2021
1 parent f66c2c5 commit fa6db10
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 13 deletions.
41 changes: 34 additions & 7 deletions src/client/client.rs
Expand Up @@ -11,7 +11,9 @@ use http::{Method, Request, Response, Uri, Version};

use super::conn;
use super::connect::{self, sealed::Connect, Alpn, Connected, Connection};
use super::pool::{self, Key as PoolKey, Pool, Poolable, Pooled, Reservation};
use super::pool::{
self, CheckoutIsClosedError, Key as PoolKey, Pool, Poolable, Pooled, Reservation,
};
#[cfg(feature = "tcp")]
use super::HttpConnector;
use crate::body::{Body, HttpBody};
Expand Down Expand Up @@ -223,7 +225,17 @@ where
mut req: Request<B>,
pool_key: PoolKey,
) -> Result<Response<Body>, ClientError<B>> {
let mut pooled = self.connection_for(pool_key).await?;
let mut pooled = match self.connection_for(pool_key).await {
Ok(pooled) => pooled,
Err(ClientConnectError::Normal(err)) => return Err(ClientError::Normal(err)),
Err(ClientConnectError::H2CheckoutIsClosed(reason)) => {
return Err(ClientError::Canceled {
connection_reused: true,
req,
reason,
})
}
};

if pooled.is_http1() {
if req.version() == Version::HTTP_2 {
Expand Down Expand Up @@ -321,7 +333,7 @@ where
async fn connection_for(
&self,
pool_key: PoolKey,
) -> Result<Pooled<PoolClient<B>>, ClientError<B>> {
) -> Result<Pooled<PoolClient<B>>, ClientConnectError> {
// This actually races 2 different futures to try to get a ready
// connection the fastest, and to reduce connection churn.
//
Expand All @@ -337,6 +349,7 @@ where
// and then be inserted into the pool as an idle connection.
let checkout = self.pool.checkout(pool_key.clone());
let connect = self.connect_to(pool_key);
let is_ver_h2 = self.config.ver == Ver::Http2;

// The order of the `select` is depended on below...

Expand Down Expand Up @@ -380,16 +393,25 @@ where
// In both cases, we should just wait for the other future.
Either::Left((Err(err), connecting)) => {
if err.is_canceled() {
connecting.await.map_err(ClientError::Normal)
connecting.await.map_err(ClientConnectError::Normal)
} else {
Err(ClientError::Normal(err))
Err(ClientConnectError::Normal(err))
}
}
Either::Right((Err(err), checkout)) => {
if err.is_canceled() {
checkout.await.map_err(ClientError::Normal)
checkout.await.map_err(move |err| {
if is_ver_h2
&& err.is_canceled()
&& err.find_source::<CheckoutIsClosedError>().is_some()
{
ClientConnectError::H2CheckoutIsClosed(err)
} else {
ClientConnectError::Normal(err)
}
})
} else {
Err(ClientError::Normal(err))
Err(ClientConnectError::Normal(err))
}
}
}
Expand Down Expand Up @@ -722,6 +744,11 @@ impl<B> ClientError<B> {
}
}

enum ClientConnectError {
Normal(crate::Error),
H2CheckoutIsClosed(crate::Error),
}

/// A marker to identify what version a pooled connection is.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub(super) enum Ver {
Expand Down
23 changes: 18 additions & 5 deletions src/client/pool.rs
@@ -1,4 +1,5 @@
use std::collections::{HashMap, HashSet, VecDeque};
use std::error::Error as StdError;
use std::fmt;
use std::ops::{Deref, DerefMut};
use std::sync::{Arc, Mutex, Weak};
Expand Down Expand Up @@ -560,28 +561,40 @@ pub(super) struct Checkout<T> {
waiter: Option<oneshot::Receiver<T>>,
}

#[derive(Debug)]
pub(super) struct CheckoutIsClosedError;

impl StdError for CheckoutIsClosedError {}

impl fmt::Display for CheckoutIsClosedError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("checked out connection was closed")
}
}

impl<T: Poolable> Checkout<T> {
fn poll_waiter(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Option<crate::Result<Pooled<T>>>> {
static CANCELED: &str = "pool checkout failed";
if let Some(mut rx) = self.waiter.take() {
match Pin::new(&mut rx).poll(cx) {
Poll::Ready(Ok(value)) => {
if value.is_open() {
Poll::Ready(Some(Ok(self.pool.reuse(&self.key, value))))
} else {
Poll::Ready(Some(Err(crate::Error::new_canceled().with(CANCELED))))
Poll::Ready(Some(Err(
crate::Error::new_canceled().with(CheckoutIsClosedError)
)))
}
}
Poll::Pending => {
self.waiter = Some(rx);
Poll::Pending
}
Poll::Ready(Err(_canceled)) => {
Poll::Ready(Some(Err(crate::Error::new_canceled().with(CANCELED))))
}
Poll::Ready(Err(_canceled)) => Poll::Ready(Some(Err(
crate::Error::new_canceled().with("request has been canceled")
))),
}
} else {
Poll::Ready(None)
Expand Down
2 changes: 1 addition & 1 deletion src/error.rs
Expand Up @@ -214,7 +214,7 @@ impl Error {
&self.inner.kind
}

fn find_source<E: StdError + 'static>(&self) -> Option<&E> {
pub(crate) fn find_source<E: StdError + 'static>(&self) -> Option<&E> {
let mut cause = self.source();
while let Some(err) = cause {
if let Some(ref typed) = err.downcast_ref() {
Expand Down

0 comments on commit fa6db10

Please sign in to comment.