Skip to content

Commit

Permalink
feat(client): change connectors to return an impl Connection
Browse files Browse the repository at this point in the history
Instead of returning a tuple `(impl AsyncRead + AsyncWrite, Connected)`,
this adds a new trait, `hyper::client::connect::Connection`, which
allows querying the connection type for a `Connected`.

BREAKING CHANGE: Connectors no longer return a tuple of
  `(T, Connected)`, but a single `T: Connection`.
  • Loading branch information
seanmonstar committed Dec 5, 2019
1 parent 319e8ae commit 4d7a226
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 37 deletions.
3 changes: 1 addition & 2 deletions examples/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
use std::env;
use std::io::{self, Write};

use hyper::Client;
use futures_util::StreamExt;
use hyper::{Client, body::HttpBody as _};

// A simple type alias so as to DRY.
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
Expand Down
32 changes: 19 additions & 13 deletions src/client/connect/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use tokio::net::TcpStream;
use tokio::time::Delay;

use super::dns::{self, resolve, GaiResolver, Resolve};
use super::{Connected};
use super::{Connected, Connection};
//#[cfg(feature = "runtime")] use super::dns::TokioThreadpoolGaiResolver;


Expand Down Expand Up @@ -234,7 +234,7 @@ where
R: Resolve + Clone + Send + Sync + 'static,
R::Future: Send,
{
type Response = (TcpStream, Connected);
type Response = TcpStream;
type Error = ConnectError;
type Future = HttpConnecting<R>;

Expand All @@ -259,7 +259,7 @@ where
async fn call_async(
&mut self,
dst: Uri,
) -> Result<(TcpStream, Connected), ConnectError> {
) -> Result<TcpStream, ConnectError> {
trace!(
"Http::connect; scheme={:?}, host={:?}, port={:?}",
dst.scheme(),
Expand Down Expand Up @@ -340,14 +340,20 @@ where
sock.set_nodelay(config.nodelay)
.map_err(ConnectError::m("tcp set_nodelay error"))?;

let extra = HttpInfo {
remote_addr: sock
.peer_addr()
.map_err(ConnectError::m("tcp peer_addr error"))?,
};
let connected = Connected::new().extra(extra);
Ok(sock)
}
}

Ok((sock, connected))
impl Connection for TcpStream {
fn connected(&self) -> Connected {
let connected = Connected::new();
if let Ok(remote_addr) = self.peer_addr() {
connected.extra(HttpInfo {
remote_addr,
})
} else {
connected
}
}
}

Expand All @@ -372,7 +378,7 @@ pub struct HttpConnecting<R> {
_marker: PhantomData<R>,
}

type ConnectResult = Result<(TcpStream, Connected), ConnectError>;
type ConnectResult = Result<TcpStream, ConnectError>;
type BoxConnecting = Pin<Box<dyn Future<Output = ConnectResult> + Send>>;

impl<R: Resolve> Future for HttpConnecting<R> {
Expand Down Expand Up @@ -644,12 +650,12 @@ mod tests {
use ::http::Uri;

use super::super::sealed::Connect;
use super::{Connected, HttpConnector};
use super::HttpConnector;

async fn connect<C>(
connector: C,
dst: Uri,
) -> Result<(C::Transport, Connected), C::Error>
) -> Result<C::Transport, C::Error>
where
C: Connect,
{
Expand Down
20 changes: 13 additions & 7 deletions src/client/connect/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ use ::http::{Response};
#[cfg(feature = "tcp")] mod http;
#[cfg(feature = "tcp")] pub use self::http::{HttpConnector, HttpInfo};

/// Describes a type returned by a connector.
pub trait Connection {
/// Return metadata describing the connection.
fn connected(&self) -> Connected;
}

/// Extra information about the connected transport.
///
/// This can be used to inform recipients about things like if ALPN
Expand Down Expand Up @@ -167,7 +173,7 @@ pub(super) mod sealed {
use tokio::io::{AsyncRead, AsyncWrite};

use crate::common::{Future, Unpin};
use super::{Connected};
use super::{Connection};

/// Connect to a destination, returning an IO transport.
///
Expand All @@ -183,21 +189,21 @@ pub(super) mod sealed {
// fit the `Connect` bounds because of the blanket impl for `Service`.
pub trait Connect: Sealed + Sized {
/// The connected IO Stream.
type Transport: AsyncRead + AsyncWrite;
type Transport: AsyncRead + AsyncWrite + Connection;
/// An error occured when trying to connect.
type Error: Into<Box<dyn StdError + Send + Sync>>;
/// A Future that will resolve to the connected Transport.
type Future: Future<Output=Result<(Self::Transport, Connected), Self::Error>>;
type Future: Future<Output=Result<Self::Transport, Self::Error>>;
#[doc(hidden)]
fn connect(self, internal_only: Internal, dst: Uri) -> Self::Future;
}

impl<S, T> Connect for S
where
S: tower_service::Service<Uri, Response=(T, Connected)> + Send,
S: tower_service::Service<Uri, Response=T> + Send,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
S::Future: Unpin + Send,
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
{
type Transport = T;
type Error = S::Error;
Expand All @@ -209,10 +215,10 @@ pub(super) mod sealed {

impl<S, T> Sealed for S
where
S: tower_service::Service<Uri, Response=(T, Connected)> + Send,
S: tower_service::Service<Uri, Response=T> + Send,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
S::Future: Unpin + Send,
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
{}

pub trait Sealed {}
Expand Down
5 changes: 3 additions & 2 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ use http::uri::Scheme;

use crate::body::{Body, Payload};
use crate::common::{lazy as hyper_lazy, BoxSendFuture, Executor, Lazy, Future, Pin, Poll, task};
use self::connect::{Alpn, sealed::Connect, Connected};
use self::connect::{Alpn, sealed::Connect, Connected, Connection};
use self::pool::{Key as PoolKey, Pool, Poolable, Pooled, Reservation};

#[cfg(feature = "tcp")] pub use self::connect::HttpConnector;
Expand Down Expand Up @@ -478,7 +478,8 @@ where C: Connect + Clone + Send + Sync + 'static,
};
Either::Left(connector.connect(connect::sealed::Internal, dst)
.map_err(crate::Error::new_connect)
.and_then(move |(io, connected)| {
.and_then(move |io| {
let connected = io.connected();
// If ALPN is h2 and we aren't http2_only already,
// then we need to convert our pool checkout into
// a single HTTP2 one.
Expand Down
43 changes: 31 additions & 12 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@ mod dispatch_impl {
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;

use hyper::client::connect::{Connected, HttpConnector};
use hyper::client::connect::{Connected, Connection, HttpConnector};
use hyper::Client;

#[test]
Expand Down Expand Up @@ -1740,7 +1740,7 @@ mod dispatch_impl {
}

impl hyper::service::Service<Uri> for DebugConnector {
type Response = (DebugStream, Connected);
type Response = DebugStream;
type Error = <HttpConnector as hyper::service::Service<Uri>>::Error;
type Future = Pin<Box<dyn Future<
Output = Result<Self::Response, Self::Error>
Expand All @@ -1756,38 +1756,45 @@ mod dispatch_impl {
let closes = self.closes.clone();
let is_proxy = self.is_proxy;
let is_alpn_h2 = self.alpn_h2;
Box::pin(self.http.call(dst).map_ok(move |(s, mut c)| {
if is_alpn_h2 {
c = c.negotiated_h2();
Box::pin(self.http.call(dst).map_ok(move |tcp| {
DebugStream {
tcp,
on_drop: closes,
is_alpn_h2,
is_proxy,
}
(DebugStream(s, closes), c.proxy(is_proxy))
}))
}
}

struct DebugStream(TcpStream, mpsc::Sender<()>);
struct DebugStream {
tcp: TcpStream,
on_drop: mpsc::Sender<()>,
is_alpn_h2: bool,
is_proxy: bool,
}

impl Drop for DebugStream {
fn drop(&mut self) {
let _ = self.1.try_send(());
let _ = self.on_drop.try_send(());
}
}

impl AsyncWrite for DebugStream {
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.0).poll_shutdown(cx)
Pin::new(&mut self.tcp).poll_shutdown(cx)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.0).poll_flush(cx)
Pin::new(&mut self.tcp).poll_flush(cx)
}

fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
Pin::new(&mut self.0).poll_write(cx, buf)
Pin::new(&mut self.tcp).poll_write(cx, buf)
}
}

Expand All @@ -1797,7 +1804,19 @@ mod dispatch_impl {
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize, io::Error>> {
Pin::new(&mut self.0).poll_read(cx, buf)
Pin::new(&mut self.tcp).poll_read(cx, buf)
}
}

impl Connection for DebugStream {
fn connected(&self) -> Connected {
let connected = self.tcp.connected().proxy(self.is_proxy);

if self.is_alpn_h2 {
connected.negotiated_h2()
} else {
connected
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1779,7 +1779,7 @@ impl tower_service::Service<Request<Body>> for TestService {
let replies = self.reply.clone();

Box::pin(async move {
while let Some(chunk) = req.body_mut().next().await {
while let Some(chunk) = hyper::body::HttpBody::next(req.body_mut()).await {
match chunk {
Ok(chunk) => {
tx.send(Msg::Chunk(chunk.to_vec())).unwrap();
Expand Down

0 comments on commit 4d7a226

Please sign in to comment.