Skip to content

Commit

Permalink
feat(client): redesign the Connect trait
Browse files Browse the repository at this point in the history
The original `Connect` trait had some limitations:

- There was no way to provide more details to the connector about how to
  connect, other than the `Uri`.
- There was no way for the connector to return any extra information
  about the connected transport.
- The `Error` was forced to be an `std::io::Error`.
- The transport and future had `'static` requirements.

As hyper gains HTTP/2 support, some of these things needed to be
changed. We want to allow the user to configure whether they hope to
us ALPN to start an HTTP/2 connection, and the connector needs to be
able to return back to hyper if it did so.

The new `Connect` trait is meant to solve this.

- The `connect` method now receives a `Destination` type, instead of a
  `Uri`. This allows us to include additional data about how to connect.
- The `Future` returned from `connect` now must be a tuple of the
  transport, and a `Connected` metadata value. The `Connected` includes
  possibly extra data about what happened when connecting.

BREAKING CHANGE: Custom connectors should now implement `Connect`
  directly, instead of `Service`.

  Calls to `connect` no longer take `Uri`s, but `Destination`. There
  are `scheme`, `host`, and `port` methods to query relevant
  information.

  The returned future must be a tuple of the transport and `Connected`.
  If no relevant extra information is needed, simply return
  `Connected::new()`.

Closes #1428
  • Loading branch information
seanmonstar committed Mar 19, 2018
1 parent fbc449e commit 8c52c2d
Show file tree
Hide file tree
Showing 5 changed files with 276 additions and 109 deletions.
209 changes: 156 additions & 53 deletions src/client/connect.rs
@@ -1,3 +1,10 @@
//! The `Connect` trait, and supporting types.
//!
//! This module contains:
//!
//! - A default [`HttpConnector`](HttpConnector) that does DNS resolution and
//! establishes connections over TCP.
//! - The [`Connect`](Connect) trait and related types to build custom connectors.
use std::error::Error as StdError;
use std::fmt;
use std::io;
Expand All @@ -14,38 +21,121 @@ use http::uri::Scheme;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio::reactor::Handle;
use tokio::net::{TcpStream, TcpStreamNew};
use tokio_service::Service;

use super::dns;
use self::http_connector::HttpConnectorBlockingTask;

/// A connector creates an Io to a remote address..
/// Connect to a destination, returning an IO transport.
///
/// This trait is not implemented directly, and only exists to make
/// the intent clearer. A connector should implement `Service` with
/// `Request=Uri` and `Response: Io` instead.
pub trait Connect: Service<Request=Uri, Error=io::Error> + 'static {
/// The connected Io Stream.
type Output: AsyncRead + AsyncWrite + 'static;
/// A Future that will resolve to the connected Stream.
type Future: Future<Item=Self::Output, Error=io::Error> + 'static;
/// Connect to a remote address.
fn connect(&self, Uri) -> <Self as Connect>::Future;
/// A connector receives a [`Destination`](Destination) describing how a
/// connection should be estabilished, and returns a `Future` of the
/// ready connection.
pub trait Connect {
/// The connected IO Stream.
type Transport: AsyncRead + AsyncWrite + 'static;
/// An error occured when trying to connect.
type Error;
/// A Future that will resolve to the connected Transport.
type Future: Future<Item=(Self::Transport, Connected), Error=Self::Error>;
/// Connect to a destination.
fn connect(&self, dst: Destination) -> Self::Future;
}

impl<T> Connect for T
where T: Service<Request=Uri, Error=io::Error> + 'static,
T::Response: AsyncRead + AsyncWrite,
T::Future: Future<Error=io::Error>,
{
type Output = T::Response;
type Future = T::Future;
/// A set of properties to describe where and how to try to connect.
#[derive(Debug)]
pub struct Destination {
//pub(super) alpn: Alpn,
pub(super) uri: Uri,
}

/// Extra information about the connected transport.
///
/// This can be used to inform recipients about things like if ALPN
/// was used, or if connected to an HTTP proxy.
#[derive(Debug)]
pub struct Connected {
//alpn: Alpn,
pub(super) is_proxied: bool,
}

/*TODO: when HTTP1 Upgrades to H2 are added, this will be needed
#[derive(Debug)]
pub(super) enum Alpn {
Http1,
//H2,
//Http1OrH2
}
*/

impl Destination {
/// Get the protocol scheme.
#[inline]
pub fn scheme(&self) -> &str {
self.uri
.scheme_part()
.expect("destination uri has scheme")
.as_str()
}

/// Get the hostname.
#[inline]
pub fn host(&self) -> &str {
self.uri
.host()
.expect("destination uri has host")
}

/// Get the port, if specified.
#[inline]
pub fn port(&self) -> Option<u16> {
self.uri.port()
}

/*
/// Returns whether this connection must negotiate HTTP/2 via ALPN.
pub fn must_h2(&self) -> bool {
match self.alpn {
Alpn::Http1 => false,
Alpn::H2 => true,
}
}
*/
}

fn connect(&self, url: Uri) -> <Self as Connect>::Future {
self.call(url)
impl Connected {
/// Create new `Connected` type with empty metadata.
pub fn new() -> Connected {
Connected {
//alpn: Alpn::Http1,
is_proxied: false,
}
}

/// Set whether the connected transport is to an HTTP proxy.
///
/// This setting will affect if HTTP/1 requests written on the transport
/// will have the request-target in absolute-form or origin-form (such as
/// `GET http://hyper.rs/guide HTTP/1.1` or `GET /guide HTTP/1.1`).
///
/// Default is `false`.
pub fn proxy(mut self, is_proxied: bool) -> Connected {
self.is_proxied = is_proxied;
self
}

/*
/// Set that the connected transport negotiated HTTP/2 as it's
/// next protocol.
pub fn h2(mut self) -> Connected {
self.alpn = Alpn::H2;
self
}
*/
}

/// A connector for the `http` scheme.
///
/// Performs DNS resolution in a thread pool, and then connects over TCP.
#[derive(Clone)]
pub struct HttpConnector {
executor: HttpConnectExecutor,
Expand Down Expand Up @@ -109,30 +199,29 @@ impl fmt::Debug for HttpConnector {
}
}

impl Service for HttpConnector {
type Request = Uri;
type Response = TcpStream;
impl Connect for HttpConnector {
type Transport = TcpStream;
type Error = io::Error;
type Future = HttpConnecting;

fn call(&self, uri: Uri) -> Self::Future {
trace!("Http::connect({:?})", uri);
fn connect(&self, dst: Destination) -> Self::Future {
trace!("Http::connect({:?})", dst.uri);

if self.enforce_http {
if uri.scheme_part() != Some(&Scheme::HTTP) {
if dst.uri.scheme_part() != Some(&Scheme::HTTP) {
return invalid_url(InvalidUrl::NotHttp, &self.handle);
}
} else if uri.scheme_part().is_none() {
} else if dst.uri.scheme_part().is_none() {
return invalid_url(InvalidUrl::MissingScheme, &self.handle);
}

let host = match uri.host() {
let host = match dst.uri.host() {
Some(s) => s,
None => return invalid_url(InvalidUrl::MissingAuthority, &self.handle),
};
let port = match uri.port() {
let port = match dst.uri.port() {
Some(port) => port,
None => if uri.scheme_part() == Some(&Scheme::HTTPS) { 443 } else { 80 },
None => if dst.uri.scheme_part() == Some(&Scheme::HTTPS) { 443 } else { 80 },
};

HttpConnecting {
Expand Down Expand Up @@ -191,7 +280,7 @@ enum State {
}

impl Future for HttpConnecting {
type Item = TcpStream;
type Item = (TcpStream, Connected);
type Error = io::Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
Expand Down Expand Up @@ -230,7 +319,7 @@ impl Future for HttpConnecting {
sock.set_keepalive(Some(dur))?;
}

return Ok(Async::Ready(sock));
return Ok(Async::Ready((sock, Connected::new())));
},
State::Error(ref mut e) => return Err(e.take().expect("polled more than once")),
}
Expand Down Expand Up @@ -279,23 +368,27 @@ impl ConnectingTcp {
}
}

/// Blocking task to be executed on a thread pool.
pub struct HttpConnectorBlockingTask {
work: oneshot::Execute<dns::Work>
}
// Make this Future unnameable outside of this crate.
mod http_connector {
use super::*;
// Blocking task to be executed on a thread pool.
pub struct HttpConnectorBlockingTask {
pub(super) work: oneshot::Execute<dns::Work>
}

impl fmt::Debug for HttpConnectorBlockingTask {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("HttpConnectorBlockingTask")
impl fmt::Debug for HttpConnectorBlockingTask {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("HttpConnectorBlockingTask")
}
}
}

impl Future for HttpConnectorBlockingTask {
type Item = ();
type Error = ();
impl Future for HttpConnectorBlockingTask {
type Item = ();
type Error = ();

fn poll(&mut self) -> Poll<(), ()> {
self.work.poll()
fn poll(&mut self) -> Poll<(), ()> {
self.work.poll()
}
}
}

Expand All @@ -311,35 +404,45 @@ impl Executor<oneshot::Execute<dns::Work>> for HttpConnectExecutor {

#[cfg(test)]
mod tests {
#![allow(deprecated)]
use std::io;
use tokio::reactor::Core;
use super::{Connect, HttpConnector};
use super::{Connect, Destination, HttpConnector};

#[test]
fn test_errors_missing_authority() {
let mut core = Core::new().unwrap();
let url = "/foo/bar?baz".parse().unwrap();
let uri = "/foo/bar?baz".parse().unwrap();
let dst = Destination {
uri,
};
let connector = HttpConnector::new(1, &core.handle());

assert_eq!(core.run(connector.connect(url)).unwrap_err().kind(), io::ErrorKind::InvalidInput);
assert_eq!(core.run(connector.connect(dst)).unwrap_err().kind(), io::ErrorKind::InvalidInput);
}

#[test]
fn test_errors_enforce_http() {
let mut core = Core::new().unwrap();
let url = "https://example.domain/foo/bar?baz".parse().unwrap();
let uri = "https://example.domain/foo/bar?baz".parse().unwrap();
let dst = Destination {
uri,
};
let connector = HttpConnector::new(1, &core.handle());

assert_eq!(core.run(connector.connect(url)).unwrap_err().kind(), io::ErrorKind::InvalidInput);
assert_eq!(core.run(connector.connect(dst)).unwrap_err().kind(), io::ErrorKind::InvalidInput);
}


#[test]
fn test_errors_missing_scheme() {
let mut core = Core::new().unwrap();
let url = "example.domain".parse().unwrap();
let uri = "example.domain".parse().unwrap();
let dst = Destination {
uri,
};
let connector = HttpConnector::new(1, &core.handle());

assert_eq!(core.run(connector.connect(url)).unwrap_err().kind(), io::ErrorKind::InvalidInput);
assert_eq!(core.run(connector.connect(dst)).unwrap_err().kind(), io::ErrorKind::InvalidInput);
}
}

0 comments on commit 8c52c2d

Please sign in to comment.