Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(client): retry when check out returns closed connection with h2 only #2585

Merged
merged 3 commits into from Jul 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
41 changes: 34 additions & 7 deletions src/client/client.rs
Expand Up @@ -11,7 +11,9 @@ use http::{Method, Request, Response, Uri, Version};

use super::conn;
use super::connect::{self, sealed::Connect, Alpn, Connected, Connection};
use super::pool::{self, Key as PoolKey, Pool, Poolable, Pooled, Reservation};
use super::pool::{
self, CheckoutIsClosedError, Key as PoolKey, Pool, Poolable, Pooled, Reservation,
};
#[cfg(feature = "tcp")]
use super::HttpConnector;
use crate::body::{Body, HttpBody};
Expand Down Expand Up @@ -223,7 +225,17 @@ where
mut req: Request<B>,
pool_key: PoolKey,
) -> Result<Response<Body>, ClientError<B>> {
let mut pooled = self.connection_for(pool_key).await?;
let mut pooled = match self.connection_for(pool_key).await {
Ok(pooled) => pooled,
Err(ClientConnectError::Normal(err)) => return Err(ClientError::Normal(err)),
Err(ClientConnectError::H2CheckoutIsClosed(reason)) => {
return Err(ClientError::Canceled {
connection_reused: true,
req,
reason,
})
}
};

if pooled.is_http1() {
if req.version() == Version::HTTP_2 {
Expand Down Expand Up @@ -321,7 +333,7 @@ where
async fn connection_for(
&self,
pool_key: PoolKey,
) -> Result<Pooled<PoolClient<B>>, ClientError<B>> {
) -> Result<Pooled<PoolClient<B>>, ClientConnectError> {
// This actually races 2 different futures to try to get a ready
// connection the fastest, and to reduce connection churn.
//
Expand All @@ -337,6 +349,7 @@ where
// and then be inserted into the pool as an idle connection.
let checkout = self.pool.checkout(pool_key.clone());
let connect = self.connect_to(pool_key);
let is_ver_h2 = self.config.ver == Ver::Http2;

// The order of the `select` is depended on below...

Expand Down Expand Up @@ -380,16 +393,25 @@ where
// In both cases, we should just wait for the other future.
Either::Left((Err(err), connecting)) => {
if err.is_canceled() {
connecting.await.map_err(ClientError::Normal)
connecting.await.map_err(ClientConnectError::Normal)
} else {
Err(ClientError::Normal(err))
Err(ClientConnectError::Normal(err))
}
}
Either::Right((Err(err), checkout)) => {
if err.is_canceled() {
checkout.await.map_err(ClientError::Normal)
checkout.await.map_err(move |err| {
if is_ver_h2
&& err.is_canceled()
&& err.find_source::<CheckoutIsClosedError>().is_some()
{
ClientConnectError::H2CheckoutIsClosed(err)
} else {
ClientConnectError::Normal(err)
}
})
} else {
Err(ClientError::Normal(err))
Err(ClientConnectError::Normal(err))
}
}
}
Expand Down Expand Up @@ -722,6 +744,11 @@ impl<B> ClientError<B> {
}
}

enum ClientConnectError {
Normal(crate::Error),
H2CheckoutIsClosed(crate::Error),
}

/// A marker to identify what version a pooled connection is.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub(super) enum Ver {
Expand Down
23 changes: 18 additions & 5 deletions src/client/pool.rs
@@ -1,4 +1,5 @@
use std::collections::{HashMap, HashSet, VecDeque};
use std::error::Error as StdError;
use std::fmt;
use std::ops::{Deref, DerefMut};
use std::sync::{Arc, Mutex, Weak};
Expand Down Expand Up @@ -560,28 +561,40 @@ pub(super) struct Checkout<T> {
waiter: Option<oneshot::Receiver<T>>,
}

#[derive(Debug)]
pub(super) struct CheckoutIsClosedError;

impl StdError for CheckoutIsClosedError {}

impl fmt::Display for CheckoutIsClosedError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("checked out connection was closed")
}
}

impl<T: Poolable> Checkout<T> {
fn poll_waiter(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Option<crate::Result<Pooled<T>>>> {
static CANCELED: &str = "pool checkout failed";
if let Some(mut rx) = self.waiter.take() {
match Pin::new(&mut rx).poll(cx) {
Poll::Ready(Ok(value)) => {
if value.is_open() {
Poll::Ready(Some(Ok(self.pool.reuse(&self.key, value))))
} else {
Poll::Ready(Some(Err(crate::Error::new_canceled().with(CANCELED))))
Poll::Ready(Some(Err(
crate::Error::new_canceled().with(CheckoutIsClosedError)
)))
}
}
Poll::Pending => {
self.waiter = Some(rx);
Poll::Pending
}
Poll::Ready(Err(_canceled)) => {
Poll::Ready(Some(Err(crate::Error::new_canceled().with(CANCELED))))
}
Poll::Ready(Err(_canceled)) => Poll::Ready(Some(Err(
crate::Error::new_canceled().with("request has been canceled")
))),
}
} else {
Poll::Ready(None)
Expand Down
2 changes: 1 addition & 1 deletion src/error.rs
Expand Up @@ -214,7 +214,7 @@ impl Error {
&self.inner.kind
}

fn find_source<E: StdError + 'static>(&self) -> Option<&E> {
pub(crate) fn find_source<E: StdError + 'static>(&self) -> Option<&E> {
let mut cause = self.source();
while let Some(err) = cause {
if let Some(ref typed) = err.downcast_ref() {
Expand Down