From d67e49f1491327a78f804bab32804dc6c73d2974 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Mon, 21 Oct 2019 17:26:21 -0700 Subject: [PATCH] feat(client): change `Connect` trait into an alias for `Service` The `Connect` trait is now essentially an alias for `Service`, with a blanket implementation as such, and is sealed. Closes #1902 BREAKING CHANGE: Any manual implementations of `Connect` must instead implement `tower::Service`. --- src/client/connect/http.rs | 163 ++++++++++++++++++++++--------------- src/client/connect/mod.rs | 95 +++++++++++++-------- src/client/mod.rs | 20 ++--- src/service/mod.rs | 3 + src/service/oneshot.rs | 70 ++++++++++++++++ tests/client.rs | 18 ++-- 6 files changed, 256 insertions(+), 113 deletions(-) create mode 100644 src/service/oneshot.rs diff --git a/src/client/connect/http.rs b/src/client/connect/http.rs index ad60fd7d7b..7f40ee1770 100644 --- a/src/client/connect/http.rs +++ b/src/client/connect/http.rs @@ -3,6 +3,7 @@ use std::error::Error as StdError; use std::io; use std::mem; use std::net::{IpAddr, SocketAddr}; +use std::sync::Arc; use std::time::Duration; use http::uri::{Scheme, Uri}; @@ -13,7 +14,7 @@ use tokio_net::tcp::TcpStream; use tokio_timer::{Delay, Timeout}; use crate::common::{Future, Pin, Poll, task}; -use super::{Connect, Connected, Destination}; +use super::{Connected, Destination}; use super::dns::{self, GaiResolver, Resolve}; #[cfg(feature = "runtime")] use super::dns::TokioThreadpoolGaiResolver; @@ -30,17 +31,8 @@ type ConnectFuture = Pin> + Send>> /// transport information such as the remote socket address used. #[derive(Clone)] pub struct HttpConnector { - enforce_http: bool, - handle: Option, - connect_timeout: Option, - happy_eyeballs_timeout: Option, - keep_alive_timeout: Option, - local_address: Option, - nodelay: bool, + config: Arc, resolver: R, - reuse_address: bool, - send_buffer_size: Option, - recv_buffer_size: Option, } /// Extra information about the transport when an HttpConnector is used. @@ -76,6 +68,22 @@ pub struct HttpInfo { remote_addr: SocketAddr, } +#[derive(Clone)] +struct Config { + connect_timeout: Option, + enforce_http: bool, + handle: Option, + happy_eyeballs_timeout: Option, + keep_alive_timeout: Option, + local_address: Option, + nodelay: bool, + reuse_address: bool, + send_buffer_size: Option, + recv_buffer_size: Option, +} + +// ===== impl HttpConnector ===== + impl HttpConnector { /// Construct a new HttpConnector. pub fn new() -> HttpConnector { @@ -100,17 +108,19 @@ impl HttpConnector { /// Takes a `Resolve` to handle DNS lookups. pub fn new_with_resolver(resolver: R) -> HttpConnector { HttpConnector { - enforce_http: true, - handle: None, - connect_timeout: None, - happy_eyeballs_timeout: Some(Duration::from_millis(300)), - keep_alive_timeout: None, - local_address: None, - nodelay: false, + config: Arc::new(Config { + connect_timeout: None, + enforce_http: true, + handle: None, + happy_eyeballs_timeout: Some(Duration::from_millis(300)), + keep_alive_timeout: None, + local_address: None, + nodelay: false, + reuse_address: false, + send_buffer_size: None, + recv_buffer_size: None, + }), resolver, - reuse_address: false, - send_buffer_size: None, - recv_buffer_size: None, } } @@ -119,7 +129,7 @@ impl HttpConnector { /// Enabled by default. #[inline] pub fn enforce_http(&mut self, is_enforced: bool) { - self.enforce_http = is_enforced; + self.config_mut().enforce_http = is_enforced; } /// Set a handle to a `Reactor` to register connections to. @@ -127,7 +137,7 @@ impl HttpConnector { /// If `None`, the implicit default reactor will be used. #[inline] pub fn set_reactor(&mut self, handle: Option) { - self.handle = handle; + self.config_mut().handle = handle; } /// Set that all sockets have `SO_KEEPALIVE` set with the supplied duration. @@ -137,7 +147,7 @@ impl HttpConnector { /// Default is `None`. #[inline] pub fn set_keepalive(&mut self, dur: Option) { - self.keep_alive_timeout = dur; + self.config_mut().keep_alive_timeout = dur; } /// Set that all sockets have `SO_NODELAY` set to the supplied value `nodelay`. @@ -145,19 +155,19 @@ impl HttpConnector { /// Default is `false`. #[inline] pub fn set_nodelay(&mut self, nodelay: bool) { - self.nodelay = nodelay; + self.config_mut().nodelay = nodelay; } /// Sets the value of the SO_SNDBUF option on the socket. #[inline] pub fn set_send_buffer_size(&mut self, size: Option) { - self.send_buffer_size = size; + self.config_mut().send_buffer_size = size; } /// Sets the value of the SO_RCVBUF option on the socket. #[inline] pub fn set_recv_buffer_size(&mut self, size: Option) { - self.recv_buffer_size = size; + self.config_mut().recv_buffer_size = size; } /// Set that all sockets are bound to the configured address before connection. @@ -167,7 +177,7 @@ impl HttpConnector { /// Default is `None`. #[inline] pub fn set_local_address(&mut self, addr: Option) { - self.local_address = addr; + self.config_mut().local_address = addr; } /// Set the connect timeout. @@ -178,7 +188,7 @@ impl HttpConnector { /// Default is `None`. #[inline] pub fn set_connect_timeout(&mut self, dur: Option) { - self.connect_timeout = dur; + self.config_mut().connect_timeout = dur; } /// Set timeout for [RFC 6555 (Happy Eyeballs)][RFC 6555] algorithm. @@ -195,7 +205,7 @@ impl HttpConnector { /// [RFC 6555]: https://tools.ietf.org/html/rfc6555 #[inline] pub fn set_happy_eyeballs_timeout(&mut self, dur: Option) { - self.happy_eyeballs_timeout = dur; + self.config_mut().happy_eyeballs_timeout = dur; } /// Set that all socket have `SO_REUSEADDR` set to the supplied value `reuse_address`. @@ -203,9 +213,18 @@ impl HttpConnector { /// Default is `false`. #[inline] pub fn set_reuse_address(&mut self, reuse_address: bool) -> &mut Self { - self.reuse_address = reuse_address; + self.config_mut().reuse_address = reuse_address; self } + + // private + + fn config_mut(&mut self) -> &mut Config { + // If the are HttpConnector clones, this will clone the inner + // config. So mutating the config won't ever affect previous + // clones. + Arc::make_mut(&mut self.config) + } } // R: Debug required for now to allow adding it to debug output later... @@ -216,16 +235,24 @@ impl fmt::Debug for HttpConnector { } } -impl Connect for HttpConnector +impl tower_service::Service for HttpConnector where R: Resolve + Clone + Send + Sync, R::Future: Send, { - type Transport = TcpStream; + type Response = (TcpStream, Connected); type Error = io::Error; type Future = HttpConnecting; - fn connect(&self, dst: Destination) -> Self::Future { + fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + // For now, always ready. + // TODO: When `Resolve` becomes an alias for `Service`, check + // the resolver's readiness. + drop(cx); + Poll::Ready(Ok(())) + } + + fn call(&mut self, dst: Destination) -> Self::Future { trace!( "Http::connect; scheme={}, host={}, port={:?}", dst.scheme(), @@ -233,17 +260,17 @@ where dst.port(), ); - if self.enforce_http { + if self.config.enforce_http { if dst.uri.scheme_part() != Some(&Scheme::HTTP) { - return invalid_url(InvalidUrl::NotHttp, &self.handle); + return invalid_url(InvalidUrl::NotHttp, &self.config.handle); } } else if dst.uri.scheme_part().is_none() { - return invalid_url(InvalidUrl::MissingScheme, &self.handle); + return invalid_url(InvalidUrl::MissingScheme, &self.config.handle); } let host = match dst.uri.host() { Some(s) => s, - None => return invalid_url(InvalidUrl::MissingAuthority, &self.handle), + None => return invalid_url(InvalidUrl::MissingAuthority, &self.config.handle), }; let port = match dst.uri.port_part() { Some(port) => port.as_u16(), @@ -251,16 +278,16 @@ where }; HttpConnecting { - state: State::Lazy(self.resolver.clone(), host.into(), self.local_address), - handle: self.handle.clone(), - connect_timeout: self.connect_timeout, - happy_eyeballs_timeout: self.happy_eyeballs_timeout, - keep_alive_timeout: self.keep_alive_timeout, - nodelay: self.nodelay, + state: State::Lazy(self.resolver.clone(), host.into(), self.config.local_address), + handle: self.config.handle.clone(), + connect_timeout: self.config.connect_timeout, + happy_eyeballs_timeout: self.config.happy_eyeballs_timeout, + keep_alive_timeout: self.config.keep_alive_timeout, + nodelay: self.config.nodelay, port, - reuse_address: self.reuse_address, - send_buffer_size: self.send_buffer_size, - recv_buffer_size: self.recv_buffer_size, + reuse_address: self.config.reuse_address, + send_buffer_size: self.config.send_buffer_size, + recv_buffer_size: self.config.recv_buffer_size, } } } @@ -289,17 +316,17 @@ where dst.port(), ); - if self.enforce_http { + if self.config.enforce_http { if dst.uri.scheme_part() != Some(&Scheme::HTTP) { - return invalid_url::(InvalidUrl::NotHttp, &self.handle).map_ok(|(s, _)| s).boxed(); + return invalid_url::(InvalidUrl::NotHttp, &self.config.handle).map_ok(|(s, _)| s).boxed(); } } else if dst.uri.scheme_part().is_none() { - return invalid_url::(InvalidUrl::MissingScheme, &self.handle).map_ok(|(s, _)| s).boxed(); + return invalid_url::(InvalidUrl::MissingScheme, &self.config.handle).map_ok(|(s, _)| s).boxed(); } let host = match dst.uri.host() { Some(s) => s, - None => return invalid_url::(InvalidUrl::MissingAuthority, &self.handle).map_ok(|(s, _)| s).boxed(), + None => return invalid_url::(InvalidUrl::MissingAuthority, &self.config.handle).map_ok(|(s, _)| s).boxed(), }; let port = match dst.uri.port_part() { Some(port) => port.as_u16(), @@ -307,16 +334,16 @@ where }; let fut = HttpConnecting { - state: State::Lazy(self.resolver.clone(), host.into(), self.local_address), - handle: self.handle.clone(), - connect_timeout: self.connect_timeout, - happy_eyeballs_timeout: self.happy_eyeballs_timeout, - keep_alive_timeout: self.keep_alive_timeout, - nodelay: self.nodelay, + state: State::Lazy(self.resolver.clone(), host.into(), self.config.local_address), + handle: self.config.handle.clone(), + connect_timeout: self.config.connect_timeout, + happy_eyeballs_timeout: self.config.happy_eyeballs_timeout, + keep_alive_timeout: self.config.keep_alive_timeout, + nodelay: self.config.nodelay, port, - reuse_address: self.reuse_address, - send_buffer_size: self.send_buffer_size, - recv_buffer_size: self.recv_buffer_size, + reuse_address: self.config.reuse_address, + send_buffer_size: self.config.send_buffer_size, + recv_buffer_size: self.config.recv_buffer_size, }; fut.map_ok(|(s, _)| s).boxed() @@ -671,7 +698,15 @@ mod tests { use tokio::runtime::current_thread::Runtime; use tokio_net::driver::Handle; - use super::{Connect, Destination, HttpConnector}; + use super::{Connected, Destination, HttpConnector}; + use super::super::sealed::Connect; + + async fn connect(connector: C, dst: Destination) -> Result<(C::Transport, Connected), C::Error> + where + C: Connect, + { + connector.connect(super::super::sealed::Internal, dst).await + } #[test] fn test_errors_missing_authority() { @@ -684,7 +719,7 @@ mod tests { rt.block_on(async { assert_eq!( - connector.connect(dst).await.unwrap_err().kind(), + connect(connector, dst).await.unwrap_err().kind(), io::ErrorKind::InvalidInput, ); }) @@ -701,7 +736,7 @@ mod tests { rt.block_on(async { assert_eq!( - connector.connect(dst).await.unwrap_err().kind(), + connect(connector, dst).await.unwrap_err().kind(), io::ErrorKind::InvalidInput, ); }) @@ -718,7 +753,7 @@ mod tests { rt.block_on(async { assert_eq!( - connector.connect(dst).await.unwrap_err().kind(), + connect(connector, dst).await.unwrap_err().kind(), io::ErrorKind::InvalidInput, ); }); diff --git a/src/client/connect/mod.rs b/src/client/connect/mod.rs index 39429764dc..a446b41c1d 100644 --- a/src/client/connect/mod.rs +++ b/src/client/connect/mod.rs @@ -1,52 +1,23 @@ -//! The `Connect` trait, and supporting types. +//! Connectors used by the `Client`. //! //! This module contains: //! //! - A default [`HttpConnector`](HttpConnector) that does DNS resolution and //! establishes connections over TCP. -//! - The [`Connect`](Connect) trait and related types to build custom connectors. +//! - Types to build custom connectors. use std::convert::TryFrom; -use std::error::Error as StdError; use std::{fmt, mem}; use bytes::{BufMut, Bytes, BytesMut}; use ::http::{uri, Response, Uri}; -use tokio_io::{AsyncRead, AsyncWrite}; - -use crate::common::{Future, Unpin}; #[cfg(feature = "tcp")] pub mod dns; #[cfg(feature = "tcp")] mod http; #[cfg(feature = "tcp")] pub use self::http::{HttpConnector, HttpInfo}; -/// Connect to a destination, returning an IO transport. -/// -/// A connector receives a [`Destination`](Destination) describing how a -/// connection should be estabilished, and returns a `Future` of the -/// ready connection. -pub trait Connect: Send + Sync { - /// The connected IO Stream. - type Transport: AsyncRead + AsyncWrite + Unpin + Send + 'static; - /// An error occured when trying to connect. - type Error: Into>; - /// A Future that will resolve to the connected Transport. - type Future: Future> + Unpin + Send; - /// Connect to a destination. - fn connect(&self, dst: Destination) -> Self::Future; -} - -impl Connect for Box { - type Transport = ::Transport; - type Error = ::Error; - type Future = ::Future; - fn connect(&self, dst: Destination) -> Self::Future { - ::connect(self, dst) - } -} - /// A set of properties to describe where and how to try to connect. /// -/// This type is passed an argument for the [`Connect`](Connect) trait. +/// This type is passed an argument to connectors. #[derive(Clone, Debug)] pub struct Destination { pub(super) uri: Uri, @@ -398,6 +369,66 @@ where } } +pub(super) mod sealed { + use std::error::Error as StdError; + + use tokio_io::{AsyncRead, AsyncWrite}; + + use crate::common::{Future, Unpin}; + use super::{Connected, Destination}; + + /// Connect to a destination, returning an IO transport. + /// + /// A connector receives a [`Destination`](Destination) describing how a + /// connection should be estabilished, and returns a `Future` of the + /// ready connection. + /// + /// # Trait Alias + /// + /// This is really just an *alias* for the `tower::Service` trait, with + /// additional bounds set for convenience *inside* hyper. You don't actually + /// implement this trait, but `tower::Service` instead. + // The `Sized` bound is to prevent creating `dyn Connect`, since they cannot + // fit the `Connect` bounds because of the blanket impl for `Service`. + pub trait Connect: Sealed + Sized { + /// The connected IO Stream. + type Transport: AsyncRead + AsyncWrite; + /// An error occured when trying to connect. + type Error: Into>; + /// A Future that will resolve to the connected Transport. + type Future: Future>; + #[doc(hidden)] + fn connect(self, internal_only: Internal, dst: Destination) -> Self::Future; + } + + impl Connect for S + where + S: tower_service::Service + Send, + S::Error: Into>, + S::Future: Unpin + Send, + T: AsyncRead + AsyncWrite + Unpin + Send + 'static, + { + type Transport = T; + type Error = S::Error; + type Future = crate::service::Oneshot; + fn connect(self, _: Internal, dst: Destination) -> Self::Future { + crate::service::oneshot(self, dst) + } + } + + impl Sealed for S + where + S: tower_service::Service + Send, + S::Error: Into>, + S::Future: Unpin + Send, + T: AsyncRead + AsyncWrite + Unpin + Send + 'static, + {} + + pub trait Sealed {} + #[allow(missing_debug_implementations)] + pub struct Internal; +} + #[cfg(test)] mod tests { use super::{Connected, Destination, TryFrom}; diff --git a/src/client/mod.rs b/src/client/mod.rs index 318d804572..c2c8afb193 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -72,7 +72,7 @@ use http::uri::Scheme; use crate::body::{Body, Payload}; use crate::common::{lazy as hyper_lazy, Lazy, Future, Pin, Poll, task}; -use self::connect::{Alpn, Connect, Connected, Destination}; +use self::connect::{Alpn, sealed::Connect, Connected, Destination}; use self::pool::{Key as PoolKey, Pool, Poolable, Pooled, Reservation}; #[cfg(feature = "tcp")] pub use self::connect::HttpConnector; @@ -89,7 +89,7 @@ mod tests; pub struct Client { config: Config, conn_builder: conn::Builder, - connector: Arc, + connector: C, pool: Pool>, } @@ -158,9 +158,9 @@ impl Client<(), Body> { } impl Client -where C: Connect + Sync + 'static, - C::Transport: 'static, - C::Future: 'static, +where C: Connect + Clone + Send + Sync + 'static, + C::Transport: Unpin + Send + 'static, + C::Future: Unpin + Send + 'static, B: Payload + Unpin + Send + 'static, B::Data: Send + Unpin, { @@ -486,7 +486,7 @@ where C: Connect + Sync + 'static, return Either::Right(future::err(canceled)); } }; - Either::Left(connector.connect(dst) + Either::Left(connector.connect(connect::sealed::Internal, dst) .map_err(crate::Error::new_connect) .and_then(move |(io, connected)| { // If ALPN is h2 and we aren't http2_only already, @@ -544,7 +544,7 @@ where C: Connect + Sync + 'static, } } -impl Clone for Client { +impl Clone for Client { fn clone(&self) -> Client { Client { config: self.config.clone(), @@ -1038,16 +1038,14 @@ impl Builder { /// Combine the configuration of this builder with a connector to create a `Client`. pub fn build(&self, connector: C) -> Client where - C: Connect, - C::Transport: 'static, - C::Future: 'static, + C: Connect + Clone, B: Payload + Send, B::Data: Send, { Client { config: self.client_config, conn_builder: self.conn_builder.clone(), - connector: Arc::new(connector), + connector, pool: Pool::new(self.pool_config, &self.conn_builder.exec), } } diff --git a/src/service/mod.rs b/src/service/mod.rs index 82d7f10325..26d21b9188 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -38,10 +38,13 @@ pub use tower_service::Service; mod http; mod make; +mod oneshot; mod util; pub(crate) use self::make::{MakeConnection, MakeServiceRef}; pub(crate) use self::http::HttpService; +pub(crate) use self::oneshot::{oneshot, Oneshot}; pub use self::make::make_service_fn; pub use self::util::service_fn; + diff --git a/src/service/oneshot.rs b/src/service/oneshot.rs new file mode 100644 index 0000000000..1cf4ea5348 --- /dev/null +++ b/src/service/oneshot.rs @@ -0,0 +1,70 @@ +// TODO: Eventually to be replaced with tower_util::Oneshot. + +use std::mem; +use std::marker::Unpin; + +use tower_service::Service; + +use crate::common::{Future, Pin, Poll, task}; + +pub(crate) fn oneshot(svc: S, req: Req) -> Oneshot +where + S: Service, +{ + Oneshot { + state: State::NotReady(svc, req), + } +} + +// A `Future` consuming a `Service` and request, waiting until the `Service` +// is ready, and then calling `Service::call` with the request, and +// waiting for that `Future`. +#[allow(missing_debug_implementations)] +pub struct Oneshot, Req> { + state: State, +} + +enum State, Req> { + NotReady(S, Req), + Called(S::Future), + Tmp, +} + +// Unpin is projected to S::Future, but never S. +impl Unpin for Oneshot +where + S: Service, + S::Future: Unpin, +{} + +impl Future for Oneshot +where + S: Service, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + // Safety: The service's future is never moved once we get one. + let mut me = unsafe { Pin::get_unchecked_mut(self) }; + + loop { + match me.state { + State::NotReady(ref mut svc, _) => { + ready!(svc.poll_ready(cx))?; + // fallthrough out of the match's borrow + }, + State::Called(ref mut fut) => { + return unsafe { Pin::new_unchecked(fut) }.poll(cx); + }, + State::Tmp => unreachable!(), + } + + match mem::replace(&mut me.state, State::Tmp) { + State::NotReady(mut svc, req) => { + me.state = State::Called(svc.call(req)); + }, + _ => unreachable!(), + } + } + } +} diff --git a/tests/client.rs b/tests/client.rs index 7b12584387..9401cf1af9 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -939,7 +939,7 @@ mod dispatch_impl { use tokio_io::{AsyncRead, AsyncWrite}; use tokio_net::tcp::TcpStream; - use hyper::client::connect::{Connect, Connected, Destination, HttpConnector}; + use hyper::client::connect::{Connected, Destination, HttpConnector}; use hyper::Client; #[test] @@ -1688,6 +1688,7 @@ mod dispatch_impl { } + #[derive(Clone)] struct DebugConnector { http: HttpConnector, closes: mpsc::Sender<()>, @@ -1719,19 +1720,24 @@ mod dispatch_impl { } } - impl Connect for DebugConnector { - type Transport = DebugStream; + impl hyper::service::Service for DebugConnector { + type Response = (DebugStream, Connected); type Error = io::Error; type Future = Pin + Output = Result > + Send>>; - fn connect(&self, dst: Destination) -> Self::Future { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + // don't forget to check inner service is ready :) + hyper::service::Service::::poll_ready(&mut self.http, cx) + } + + fn call(&mut self, dst: Destination) -> Self::Future { self.connects.fetch_add(1, Ordering::SeqCst); let closes = self.closes.clone(); let is_proxy = self.is_proxy; let is_alpn_h2 = self.alpn_h2; - Box::pin(self.http.connect(dst).map_ok(move |(s, mut c)| { + Box::pin(self.http.call(dst).map_ok(move |(s, mut c)| { if is_alpn_h2 { c = c.negotiated_h2(); }