Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(client): change Connect trait into an alias for Service #1912

Merged
merged 1 commit into from Oct 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
163 changes: 99 additions & 64 deletions src/client/connect/http.rs
Expand Up @@ -3,6 +3,7 @@ use std::error::Error as StdError;
use std::io;
use std::mem;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use std::time::Duration;

use http::uri::{Scheme, Uri};
Expand All @@ -13,7 +14,7 @@ use tokio_net::tcp::TcpStream;
use tokio_timer::{Delay, Timeout};

use crate::common::{Future, Pin, Poll, task};
use super::{Connect, Connected, Destination};
use super::{Connected, Destination};
use super::dns::{self, GaiResolver, Resolve};
#[cfg(feature = "runtime")] use super::dns::TokioThreadpoolGaiResolver;

Expand All @@ -30,17 +31,8 @@ type ConnectFuture = Pin<Box<dyn Future<Output = io::Result<TcpStream>> + Send>>
/// transport information such as the remote socket address used.
#[derive(Clone)]
pub struct HttpConnector<R = GaiResolver> {
enforce_http: bool,
handle: Option<Handle>,
connect_timeout: Option<Duration>,
happy_eyeballs_timeout: Option<Duration>,
keep_alive_timeout: Option<Duration>,
local_address: Option<IpAddr>,
nodelay: bool,
config: Arc<Config>,
resolver: R,
reuse_address: bool,
send_buffer_size: Option<usize>,
recv_buffer_size: Option<usize>,
}

/// Extra information about the transport when an HttpConnector is used.
Expand Down Expand Up @@ -76,6 +68,22 @@ pub struct HttpInfo {
remote_addr: SocketAddr,
}

#[derive(Clone)]
struct Config {
connect_timeout: Option<Duration>,
enforce_http: bool,
handle: Option<Handle>,
happy_eyeballs_timeout: Option<Duration>,
keep_alive_timeout: Option<Duration>,
local_address: Option<IpAddr>,
nodelay: bool,
reuse_address: bool,
send_buffer_size: Option<usize>,
recv_buffer_size: Option<usize>,
}

// ===== impl HttpConnector =====

impl HttpConnector {
/// Construct a new HttpConnector.
pub fn new() -> HttpConnector {
Expand All @@ -100,17 +108,19 @@ impl<R> HttpConnector<R> {
/// Takes a `Resolve` to handle DNS lookups.
pub fn new_with_resolver(resolver: R) -> HttpConnector<R> {
HttpConnector {
enforce_http: true,
handle: None,
connect_timeout: None,
happy_eyeballs_timeout: Some(Duration::from_millis(300)),
keep_alive_timeout: None,
local_address: None,
nodelay: false,
config: Arc::new(Config {
connect_timeout: None,
enforce_http: true,
handle: None,
happy_eyeballs_timeout: Some(Duration::from_millis(300)),
keep_alive_timeout: None,
local_address: None,
nodelay: false,
reuse_address: false,
send_buffer_size: None,
recv_buffer_size: None,
}),
resolver,
reuse_address: false,
send_buffer_size: None,
recv_buffer_size: None,
}
}

Expand All @@ -119,15 +129,15 @@ impl<R> HttpConnector<R> {
/// Enabled by default.
#[inline]
pub fn enforce_http(&mut self, is_enforced: bool) {
self.enforce_http = is_enforced;
self.config_mut().enforce_http = is_enforced;
}

/// Set a handle to a `Reactor` to register connections to.
///
/// If `None`, the implicit default reactor will be used.
#[inline]
pub fn set_reactor(&mut self, handle: Option<Handle>) {
self.handle = handle;
self.config_mut().handle = handle;
}

/// Set that all sockets have `SO_KEEPALIVE` set with the supplied duration.
Expand All @@ -137,27 +147,27 @@ impl<R> HttpConnector<R> {
/// Default is `None`.
#[inline]
pub fn set_keepalive(&mut self, dur: Option<Duration>) {
self.keep_alive_timeout = dur;
self.config_mut().keep_alive_timeout = dur;
}

/// Set that all sockets have `SO_NODELAY` set to the supplied value `nodelay`.
///
/// Default is `false`.
#[inline]
pub fn set_nodelay(&mut self, nodelay: bool) {
self.nodelay = nodelay;
self.config_mut().nodelay = nodelay;
}

/// Sets the value of the SO_SNDBUF option on the socket.
#[inline]
pub fn set_send_buffer_size(&mut self, size: Option<usize>) {
self.send_buffer_size = size;
self.config_mut().send_buffer_size = size;
}

/// Sets the value of the SO_RCVBUF option on the socket.
#[inline]
pub fn set_recv_buffer_size(&mut self, size: Option<usize>) {
self.recv_buffer_size = size;
self.config_mut().recv_buffer_size = size;
}

/// Set that all sockets are bound to the configured address before connection.
Expand All @@ -167,7 +177,7 @@ impl<R> HttpConnector<R> {
/// Default is `None`.
#[inline]
pub fn set_local_address(&mut self, addr: Option<IpAddr>) {
self.local_address = addr;
self.config_mut().local_address = addr;
}

/// Set the connect timeout.
Expand All @@ -178,7 +188,7 @@ impl<R> HttpConnector<R> {
/// Default is `None`.
#[inline]
pub fn set_connect_timeout(&mut self, dur: Option<Duration>) {
self.connect_timeout = dur;
self.config_mut().connect_timeout = dur;
}

/// Set timeout for [RFC 6555 (Happy Eyeballs)][RFC 6555] algorithm.
Expand All @@ -195,17 +205,26 @@ impl<R> HttpConnector<R> {
/// [RFC 6555]: https://tools.ietf.org/html/rfc6555
#[inline]
pub fn set_happy_eyeballs_timeout(&mut self, dur: Option<Duration>) {
self.happy_eyeballs_timeout = dur;
self.config_mut().happy_eyeballs_timeout = dur;
}

/// Set that all socket have `SO_REUSEADDR` set to the supplied value `reuse_address`.
///
/// Default is `false`.
#[inline]
pub fn set_reuse_address(&mut self, reuse_address: bool) -> &mut Self {
self.reuse_address = reuse_address;
self.config_mut().reuse_address = reuse_address;
self
}

// private

fn config_mut(&mut self) -> &mut Config {
// If the are HttpConnector clones, this will clone the inner
// config. So mutating the config won't ever affect previous
// clones.
Arc::make_mut(&mut self.config)
}
}

// R: Debug required for now to allow adding it to debug output later...
Expand All @@ -216,51 +235,59 @@ impl<R: fmt::Debug> fmt::Debug for HttpConnector<R> {
}
}

impl<R> Connect for HttpConnector<R>
impl<R> tower_service::Service<Destination> for HttpConnector<R>
seanmonstar marked this conversation as resolved.
Show resolved Hide resolved
where
R: Resolve + Clone + Send + Sync,
R::Future: Send,
{
type Transport = TcpStream;
type Response = (TcpStream, Connected);
type Error = io::Error;
type Future = HttpConnecting<R>;

fn connect(&self, dst: Destination) -> Self::Future {
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
// For now, always ready.
// TODO: When `Resolve` becomes an alias for `Service`, check
// the resolver's readiness.
drop(cx);
Poll::Ready(Ok(()))
}

fn call(&mut self, dst: Destination) -> Self::Future {
trace!(
"Http::connect; scheme={}, host={}, port={:?}",
dst.scheme(),
dst.host(),
dst.port(),
);

if self.enforce_http {
if self.config.enforce_http {
if dst.uri.scheme_part() != Some(&Scheme::HTTP) {
return invalid_url(InvalidUrl::NotHttp, &self.handle);
return invalid_url(InvalidUrl::NotHttp, &self.config.handle);
}
} else if dst.uri.scheme_part().is_none() {
return invalid_url(InvalidUrl::MissingScheme, &self.handle);
return invalid_url(InvalidUrl::MissingScheme, &self.config.handle);
}

let host = match dst.uri.host() {
Some(s) => s,
None => return invalid_url(InvalidUrl::MissingAuthority, &self.handle),
None => return invalid_url(InvalidUrl::MissingAuthority, &self.config.handle),
};
let port = match dst.uri.port_part() {
Some(port) => port.as_u16(),
None => if dst.uri.scheme_part() == Some(&Scheme::HTTPS) { 443 } else { 80 },
};

HttpConnecting {
state: State::Lazy(self.resolver.clone(), host.into(), self.local_address),
handle: self.handle.clone(),
connect_timeout: self.connect_timeout,
happy_eyeballs_timeout: self.happy_eyeballs_timeout,
keep_alive_timeout: self.keep_alive_timeout,
nodelay: self.nodelay,
state: State::Lazy(self.resolver.clone(), host.into(), self.config.local_address),
handle: self.config.handle.clone(),
connect_timeout: self.config.connect_timeout,
happy_eyeballs_timeout: self.config.happy_eyeballs_timeout,
keep_alive_timeout: self.config.keep_alive_timeout,
nodelay: self.config.nodelay,
port,
reuse_address: self.reuse_address,
send_buffer_size: self.send_buffer_size,
recv_buffer_size: self.recv_buffer_size,
reuse_address: self.config.reuse_address,
send_buffer_size: self.config.send_buffer_size,
recv_buffer_size: self.config.recv_buffer_size,
}
}
}
Expand Down Expand Up @@ -289,34 +316,34 @@ where
dst.port(),
);

if self.enforce_http {
if self.config.enforce_http {
if dst.uri.scheme_part() != Some(&Scheme::HTTP) {
return invalid_url::<R>(InvalidUrl::NotHttp, &self.handle).map_ok(|(s, _)| s).boxed();
return invalid_url::<R>(InvalidUrl::NotHttp, &self.config.handle).map_ok(|(s, _)| s).boxed();
}
} else if dst.uri.scheme_part().is_none() {
return invalid_url::<R>(InvalidUrl::MissingScheme, &self.handle).map_ok(|(s, _)| s).boxed();
return invalid_url::<R>(InvalidUrl::MissingScheme, &self.config.handle).map_ok(|(s, _)| s).boxed();
}

let host = match dst.uri.host() {
Some(s) => s,
None => return invalid_url::<R>(InvalidUrl::MissingAuthority, &self.handle).map_ok(|(s, _)| s).boxed(),
None => return invalid_url::<R>(InvalidUrl::MissingAuthority, &self.config.handle).map_ok(|(s, _)| s).boxed(),
};
let port = match dst.uri.port_part() {
Some(port) => port.as_u16(),
None => if dst.uri.scheme_part() == Some(&Scheme::HTTPS) { 443 } else { 80 },
};

let fut = HttpConnecting {
state: State::Lazy(self.resolver.clone(), host.into(), self.local_address),
handle: self.handle.clone(),
connect_timeout: self.connect_timeout,
happy_eyeballs_timeout: self.happy_eyeballs_timeout,
keep_alive_timeout: self.keep_alive_timeout,
nodelay: self.nodelay,
state: State::Lazy(self.resolver.clone(), host.into(), self.config.local_address),
handle: self.config.handle.clone(),
connect_timeout: self.config.connect_timeout,
happy_eyeballs_timeout: self.config.happy_eyeballs_timeout,
keep_alive_timeout: self.config.keep_alive_timeout,
nodelay: self.config.nodelay,
port,
reuse_address: self.reuse_address,
send_buffer_size: self.send_buffer_size,
recv_buffer_size: self.recv_buffer_size,
reuse_address: self.config.reuse_address,
send_buffer_size: self.config.send_buffer_size,
recv_buffer_size: self.config.recv_buffer_size,
};

fut.map_ok(|(s, _)| s).boxed()
Expand Down Expand Up @@ -671,7 +698,15 @@ mod tests {
use tokio::runtime::current_thread::Runtime;
use tokio_net::driver::Handle;

use super::{Connect, Destination, HttpConnector};
use super::{Connected, Destination, HttpConnector};
use super::super::sealed::Connect;

async fn connect<C>(connector: C, dst: Destination) -> Result<(C::Transport, Connected), C::Error>
where
C: Connect,
{
connector.connect(super::super::sealed::Internal, dst).await
}

#[test]
fn test_errors_missing_authority() {
Expand All @@ -684,7 +719,7 @@ mod tests {

rt.block_on(async {
assert_eq!(
connector.connect(dst).await.unwrap_err().kind(),
connect(connector, dst).await.unwrap_err().kind(),
io::ErrorKind::InvalidInput,
);
})
Expand All @@ -701,7 +736,7 @@ mod tests {

rt.block_on(async {
assert_eq!(
connector.connect(dst).await.unwrap_err().kind(),
connect(connector, dst).await.unwrap_err().kind(),
io::ErrorKind::InvalidInput,
);
})
Expand All @@ -718,7 +753,7 @@ mod tests {

rt.block_on(async {
assert_eq!(
connector.connect(dst).await.unwrap_err().kind(),
connect(connector, dst).await.unwrap_err().kind(),
io::ErrorKind::InvalidInput,
);
});
Expand Down