From 3bd6259ef78bde3471c94b0ee43fbcfe9998baf1 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Mon, 21 Oct 2019 17:26:21 -0700 Subject: [PATCH] feat(client): change `Connect` trait into an alias for `Service` The `Connect` trait is now essentially an alias for `Service`, with a blanket implementation as such, and is sealed. Closes #1902 BREAKING CHANGE: Any manual implementations of `Connect` must instead implement `tower::Service`. --- src/client/connect/http.rs | 163 ++++++++++++++++++++++--------------- src/client/connect/mod.rs | 51 +++++++++--- src/client/mod.rs | 16 ++-- src/service/mod.rs | 3 + src/service/oneshot.rs | 70 ++++++++++++++++ 5 files changed, 219 insertions(+), 84 deletions(-) create mode 100644 src/service/oneshot.rs diff --git a/src/client/connect/http.rs b/src/client/connect/http.rs index ad60fd7d7b..a8e3e784a8 100644 --- a/src/client/connect/http.rs +++ b/src/client/connect/http.rs @@ -3,6 +3,7 @@ use std::error::Error as StdError; use std::io; use std::mem; use std::net::{IpAddr, SocketAddr}; +use std::sync::Arc; use std::time::Duration; use http::uri::{Scheme, Uri}; @@ -13,7 +14,7 @@ use tokio_net::tcp::TcpStream; use tokio_timer::{Delay, Timeout}; use crate::common::{Future, Pin, Poll, task}; -use super::{Connect, Connected, Destination}; +use super::{Connected, Destination}; use super::dns::{self, GaiResolver, Resolve}; #[cfg(feature = "runtime")] use super::dns::TokioThreadpoolGaiResolver; @@ -30,17 +31,8 @@ type ConnectFuture = Pin> + Send>> /// transport information such as the remote socket address used. #[derive(Clone)] pub struct HttpConnector { - enforce_http: bool, - handle: Option, - connect_timeout: Option, - happy_eyeballs_timeout: Option, - keep_alive_timeout: Option, - local_address: Option, - nodelay: bool, + config: Arc, resolver: R, - reuse_address: bool, - send_buffer_size: Option, - recv_buffer_size: Option, } /// Extra information about the transport when an HttpConnector is used. @@ -76,6 +68,22 @@ pub struct HttpInfo { remote_addr: SocketAddr, } +#[derive(Clone)] +struct Config { + connect_timeout: Option, + enforce_http: bool, + handle: Option, + happy_eyeballs_timeout: Option, + keep_alive_timeout: Option, + local_address: Option, + nodelay: bool, + reuse_address: bool, + send_buffer_size: Option, + recv_buffer_size: Option, +} + +// ===== impl HttpConnector ===== + impl HttpConnector { /// Construct a new HttpConnector. pub fn new() -> HttpConnector { @@ -100,17 +108,19 @@ impl HttpConnector { /// Takes a `Resolve` to handle DNS lookups. pub fn new_with_resolver(resolver: R) -> HttpConnector { HttpConnector { - enforce_http: true, - handle: None, - connect_timeout: None, - happy_eyeballs_timeout: Some(Duration::from_millis(300)), - keep_alive_timeout: None, - local_address: None, - nodelay: false, + config: Arc::new(Config { + connect_timeout: None, + enforce_http: true, + handle: None, + happy_eyeballs_timeout: Some(Duration::from_millis(300)), + keep_alive_timeout: None, + local_address: None, + nodelay: false, + reuse_address: false, + send_buffer_size: None, + recv_buffer_size: None, + }), resolver, - reuse_address: false, - send_buffer_size: None, - recv_buffer_size: None, } } @@ -119,7 +129,7 @@ impl HttpConnector { /// Enabled by default. #[inline] pub fn enforce_http(&mut self, is_enforced: bool) { - self.enforce_http = is_enforced; + self.config_mut().enforce_http = is_enforced; } /// Set a handle to a `Reactor` to register connections to. @@ -127,7 +137,7 @@ impl HttpConnector { /// If `None`, the implicit default reactor will be used. #[inline] pub fn set_reactor(&mut self, handle: Option) { - self.handle = handle; + self.config_mut().handle = handle; } /// Set that all sockets have `SO_KEEPALIVE` set with the supplied duration. @@ -137,7 +147,7 @@ impl HttpConnector { /// Default is `None`. #[inline] pub fn set_keepalive(&mut self, dur: Option) { - self.keep_alive_timeout = dur; + self.config_mut().keep_alive_timeout = dur; } /// Set that all sockets have `SO_NODELAY` set to the supplied value `nodelay`. @@ -145,19 +155,19 @@ impl HttpConnector { /// Default is `false`. #[inline] pub fn set_nodelay(&mut self, nodelay: bool) { - self.nodelay = nodelay; + self.config_mut().nodelay = nodelay; } /// Sets the value of the SO_SNDBUF option on the socket. #[inline] pub fn set_send_buffer_size(&mut self, size: Option) { - self.send_buffer_size = size; + self.config_mut().send_buffer_size = size; } /// Sets the value of the SO_RCVBUF option on the socket. #[inline] pub fn set_recv_buffer_size(&mut self, size: Option) { - self.recv_buffer_size = size; + self.config_mut().recv_buffer_size = size; } /// Set that all sockets are bound to the configured address before connection. @@ -167,7 +177,7 @@ impl HttpConnector { /// Default is `None`. #[inline] pub fn set_local_address(&mut self, addr: Option) { - self.local_address = addr; + self.config_mut().local_address = addr; } /// Set the connect timeout. @@ -178,7 +188,7 @@ impl HttpConnector { /// Default is `None`. #[inline] pub fn set_connect_timeout(&mut self, dur: Option) { - self.connect_timeout = dur; + self.config_mut().connect_timeout = dur; } /// Set timeout for [RFC 6555 (Happy Eyeballs)][RFC 6555] algorithm. @@ -195,7 +205,7 @@ impl HttpConnector { /// [RFC 6555]: https://tools.ietf.org/html/rfc6555 #[inline] pub fn set_happy_eyeballs_timeout(&mut self, dur: Option) { - self.happy_eyeballs_timeout = dur; + self.config_mut().happy_eyeballs_timeout = dur; } /// Set that all socket have `SO_REUSEADDR` set to the supplied value `reuse_address`. @@ -203,9 +213,18 @@ impl HttpConnector { /// Default is `false`. #[inline] pub fn set_reuse_address(&mut self, reuse_address: bool) -> &mut Self { - self.reuse_address = reuse_address; + self.config_mut().reuse_address = reuse_address; self } + + // private + + fn config_mut(&mut self) -> &mut Config { + // If the are HttpConnector clones, this will clone the inner + // config. So mutating the config won't ever affect previous + // clones. + Arc::make_mut(&mut self.config) + } } // R: Debug required for now to allow adding it to debug output later... @@ -216,16 +235,24 @@ impl fmt::Debug for HttpConnector { } } -impl Connect for HttpConnector +impl tower_service::Service for HttpConnector where R: Resolve + Clone + Send + Sync, R::Future: Send, { - type Transport = TcpStream; + type Response = (TcpStream, Connected); type Error = io::Error; type Future = HttpConnecting; - fn connect(&self, dst: Destination) -> Self::Future { + fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + // For now, always ready. + // TODO: When `Resolve` becomes an alias for `Service`, check + // the resolver's readiness. + drop(cx); + Poll::Ready(Ok(())) + } + + fn call(&mut self, dst: Destination) -> Self::Future { trace!( "Http::connect; scheme={}, host={}, port={:?}", dst.scheme(), @@ -233,17 +260,17 @@ where dst.port(), ); - if self.enforce_http { + if self.config.enforce_http { if dst.uri.scheme_part() != Some(&Scheme::HTTP) { - return invalid_url(InvalidUrl::NotHttp, &self.handle); + return invalid_url(InvalidUrl::NotHttp, &self.config.handle); } } else if dst.uri.scheme_part().is_none() { - return invalid_url(InvalidUrl::MissingScheme, &self.handle); + return invalid_url(InvalidUrl::MissingScheme, &self.config.handle); } let host = match dst.uri.host() { Some(s) => s, - None => return invalid_url(InvalidUrl::MissingAuthority, &self.handle), + None => return invalid_url(InvalidUrl::MissingAuthority, &self.config.handle), }; let port = match dst.uri.port_part() { Some(port) => port.as_u16(), @@ -251,16 +278,16 @@ where }; HttpConnecting { - state: State::Lazy(self.resolver.clone(), host.into(), self.local_address), - handle: self.handle.clone(), - connect_timeout: self.connect_timeout, - happy_eyeballs_timeout: self.happy_eyeballs_timeout, - keep_alive_timeout: self.keep_alive_timeout, - nodelay: self.nodelay, + state: State::Lazy(self.resolver.clone(), host.into(), self.config.local_address), + handle: self.config.handle.clone(), + connect_timeout: self.config.connect_timeout, + happy_eyeballs_timeout: self.config.happy_eyeballs_timeout, + keep_alive_timeout: self.config.keep_alive_timeout, + nodelay: self.config.nodelay, port, - reuse_address: self.reuse_address, - send_buffer_size: self.send_buffer_size, - recv_buffer_size: self.recv_buffer_size, + reuse_address: self.config.reuse_address, + send_buffer_size: self.config.send_buffer_size, + recv_buffer_size: self.config.recv_buffer_size, } } } @@ -289,17 +316,17 @@ where dst.port(), ); - if self.enforce_http { + if self.config.enforce_http { if dst.uri.scheme_part() != Some(&Scheme::HTTP) { - return invalid_url::(InvalidUrl::NotHttp, &self.handle).map_ok(|(s, _)| s).boxed(); + return invalid_url::(InvalidUrl::NotHttp, &self.config.handle).map_ok(|(s, _)| s).boxed(); } } else if dst.uri.scheme_part().is_none() { - return invalid_url::(InvalidUrl::MissingScheme, &self.handle).map_ok(|(s, _)| s).boxed(); + return invalid_url::(InvalidUrl::MissingScheme, &self.config.handle).map_ok(|(s, _)| s).boxed(); } let host = match dst.uri.host() { Some(s) => s, - None => return invalid_url::(InvalidUrl::MissingAuthority, &self.handle).map_ok(|(s, _)| s).boxed(), + None => return invalid_url::(InvalidUrl::MissingAuthority, &self.config.handle).map_ok(|(s, _)| s).boxed(), }; let port = match dst.uri.port_part() { Some(port) => port.as_u16(), @@ -307,16 +334,16 @@ where }; let fut = HttpConnecting { - state: State::Lazy(self.resolver.clone(), host.into(), self.local_address), - handle: self.handle.clone(), - connect_timeout: self.connect_timeout, - happy_eyeballs_timeout: self.happy_eyeballs_timeout, - keep_alive_timeout: self.keep_alive_timeout, - nodelay: self.nodelay, + state: State::Lazy(self.resolver.clone(), host.into(), self.config.local_address), + handle: self.config.handle.clone(), + connect_timeout: self.config.connect_timeout, + happy_eyeballs_timeout: self.config.happy_eyeballs_timeout, + keep_alive_timeout: self.config.keep_alive_timeout, + nodelay: self.config.nodelay, port, - reuse_address: self.reuse_address, - send_buffer_size: self.send_buffer_size, - recv_buffer_size: self.recv_buffer_size, + reuse_address: self.config.reuse_address, + send_buffer_size: self.config.send_buffer_size, + recv_buffer_size: self.config.recv_buffer_size, }; fut.map_ok(|(s, _)| s).boxed() @@ -671,7 +698,15 @@ mod tests { use tokio::runtime::current_thread::Runtime; use tokio_net::driver::Handle; - use super::{Connect, Destination, HttpConnector}; + use super::{Connected, Destination, HttpConnector}; + use super::super::Connect; + + async fn connect(connector: C, dst: Destination) -> Result<(C::Transport, Connected), C::Error> + where + C: Connect, + { + connector.connect(super::super::sealed::Internal, dst).await + } #[test] fn test_errors_missing_authority() { @@ -684,7 +719,7 @@ mod tests { rt.block_on(async { assert_eq!( - connector.connect(dst).await.unwrap_err().kind(), + connect(connector, dst).await.unwrap_err().kind(), io::ErrorKind::InvalidInput, ); }) @@ -701,7 +736,7 @@ mod tests { rt.block_on(async { assert_eq!( - connector.connect(dst).await.unwrap_err().kind(), + connect(connector, dst).await.unwrap_err().kind(), io::ErrorKind::InvalidInput, ); }) @@ -718,7 +753,7 @@ mod tests { rt.block_on(async { assert_eq!( - connector.connect(dst).await.unwrap_err().kind(), + connect(connector, dst).await.unwrap_err().kind(), io::ErrorKind::InvalidInput, ); }); diff --git a/src/client/connect/mod.rs b/src/client/connect/mod.rs index 39429764dc..779e5773e2 100644 --- a/src/client/connect/mod.rs +++ b/src/client/connect/mod.rs @@ -24,26 +24,49 @@ use crate::common::{Future, Unpin}; /// A connector receives a [`Destination`](Destination) describing how a /// connection should be estabilished, and returns a `Future` of the /// ready connection. -pub trait Connect: Send + Sync { +/// +/// # Trait Alias +/// +/// This is really just an *alias* for the `tower::Service` trait, with +/// additional bounds set for convenience *inside* hyper. You don't actually +/// implement this trait, but `tower::Service` instead. +// The `Sized` bound is to prevent creating `dyn Connect`, since they cannot +// fit the `Connect` bounds because of the blanket impl for `Service`. +pub trait Connect: sealed::Sealed + Sized { /// The connected IO Stream. - type Transport: AsyncRead + AsyncWrite + Unpin + Send + 'static; + type Transport: AsyncRead + AsyncWrite; /// An error occured when trying to connect. type Error: Into>; /// A Future that will resolve to the connected Transport. - type Future: Future> + Unpin + Send; - /// Connect to a destination. - fn connect(&self, dst: Destination) -> Self::Future; + type Future: Future>; + #[doc(hidden)] + fn connect(self, internal_only: sealed::Internal, dst: Destination) -> Self::Future; } -impl Connect for Box { - type Transport = ::Transport; - type Error = ::Error; - type Future = ::Future; - fn connect(&self, dst: Destination) -> Self::Future { - ::connect(self, dst) +impl Connect for S +where + S: tower_service::Service + Send, + S::Error: Into>, + S::Future: Unpin + Send, + T: AsyncRead + AsyncWrite + Unpin + Send + 'static, +{ + type Transport = T; + type Error = S::Error; + type Future = crate::service::Oneshot; + fn connect(self, _: sealed::Internal, dst: Destination) -> Self::Future { + crate::service::oneshot(self, dst) } } +impl sealed::Sealed for S +where + S: tower_service::Service + Send, + S::Error: Into>, + S::Future: Unpin + Send, + T: AsyncRead + AsyncWrite + Unpin + Send + 'static, +{} + + /// A set of properties to describe where and how to try to connect. /// /// This type is passed an argument for the [`Connect`](Connect) trait. @@ -398,6 +421,12 @@ where } } +pub(super) mod sealed { + pub trait Sealed {} + #[allow(missing_debug_implementations)] + pub struct Internal; +} + #[cfg(test)] mod tests { use super::{Connected, Destination, TryFrom}; diff --git a/src/client/mod.rs b/src/client/mod.rs index 318d804572..278a80be89 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -89,7 +89,7 @@ mod tests; pub struct Client { config: Config, conn_builder: conn::Builder, - connector: Arc, + connector: C, pool: Pool>, } @@ -158,9 +158,9 @@ impl Client<(), Body> { } impl Client -where C: Connect + Sync + 'static, - C::Transport: 'static, - C::Future: 'static, +where C: Connect + Clone + Send + Sync + 'static, + C::Transport: Unpin + Send + 'static, + C::Future: Unpin + Send + 'static, B: Payload + Unpin + Send + 'static, B::Data: Send + Unpin, { @@ -486,7 +486,7 @@ where C: Connect + Sync + 'static, return Either::Right(future::err(canceled)); } }; - Either::Left(connector.connect(dst) + Either::Left(connector.connect(connect::sealed::Internal, dst) .map_err(crate::Error::new_connect) .and_then(move |(io, connected)| { // If ALPN is h2 and we aren't http2_only already, @@ -544,7 +544,7 @@ where C: Connect + Sync + 'static, } } -impl Clone for Client { +impl Clone for Client { fn clone(&self) -> Client { Client { config: self.config.clone(), @@ -1039,15 +1039,13 @@ impl Builder { pub fn build(&self, connector: C) -> Client where C: Connect, - C::Transport: 'static, - C::Future: 'static, B: Payload + Send, B::Data: Send, { Client { config: self.client_config, conn_builder: self.conn_builder.clone(), - connector: Arc::new(connector), + connector, pool: Pool::new(self.pool_config, &self.conn_builder.exec), } } diff --git a/src/service/mod.rs b/src/service/mod.rs index 82d7f10325..26d21b9188 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -38,10 +38,13 @@ pub use tower_service::Service; mod http; mod make; +mod oneshot; mod util; pub(crate) use self::make::{MakeConnection, MakeServiceRef}; pub(crate) use self::http::HttpService; +pub(crate) use self::oneshot::{oneshot, Oneshot}; pub use self::make::make_service_fn; pub use self::util::service_fn; + diff --git a/src/service/oneshot.rs b/src/service/oneshot.rs new file mode 100644 index 0000000000..1cf4ea5348 --- /dev/null +++ b/src/service/oneshot.rs @@ -0,0 +1,70 @@ +// TODO: Eventually to be replaced with tower_util::Oneshot. + +use std::mem; +use std::marker::Unpin; + +use tower_service::Service; + +use crate::common::{Future, Pin, Poll, task}; + +pub(crate) fn oneshot(svc: S, req: Req) -> Oneshot +where + S: Service, +{ + Oneshot { + state: State::NotReady(svc, req), + } +} + +// A `Future` consuming a `Service` and request, waiting until the `Service` +// is ready, and then calling `Service::call` with the request, and +// waiting for that `Future`. +#[allow(missing_debug_implementations)] +pub struct Oneshot, Req> { + state: State, +} + +enum State, Req> { + NotReady(S, Req), + Called(S::Future), + Tmp, +} + +// Unpin is projected to S::Future, but never S. +impl Unpin for Oneshot +where + S: Service, + S::Future: Unpin, +{} + +impl Future for Oneshot +where + S: Service, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + // Safety: The service's future is never moved once we get one. + let mut me = unsafe { Pin::get_unchecked_mut(self) }; + + loop { + match me.state { + State::NotReady(ref mut svc, _) => { + ready!(svc.poll_ready(cx))?; + // fallthrough out of the match's borrow + }, + State::Called(ref mut fut) => { + return unsafe { Pin::new_unchecked(fut) }.poll(cx); + }, + State::Tmp => unreachable!(), + } + + match mem::replace(&mut me.state, State::Tmp) { + State::NotReady(mut svc, req) => { + me.state = State::Called(svc.call(req)); + }, + _ => unreachable!(), + } + } + } +}