diff --git a/src/client/connect/http.rs b/src/client/connect/http.rs index bb72f81d63..c2bf9b590f 100644 --- a/src/client/connect/http.rs +++ b/src/client/connect/http.rs @@ -10,7 +10,7 @@ use futures_util::{TryFutureExt, FutureExt}; use net2::TcpBuilder; use tokio_net::driver::Handle; use tokio_net::tcp::TcpStream; -use tokio_timer::Delay; +use tokio_timer::{Delay, Timeout}; use crate::common::{Future, Pin, Poll, task}; use super::{Connect, Connected, Destination}; @@ -31,6 +31,7 @@ type ConnectFuture = Pin> + Send>> pub struct HttpConnector { enforce_http: bool, handle: Option, + connect_timeout: Option, happy_eyeballs_timeout: Option, keep_alive_timeout: Option, local_address: Option, @@ -99,6 +100,7 @@ impl HttpConnector { HttpConnector { enforce_http: true, handle: None, + connect_timeout: None, happy_eyeballs_timeout: Some(Duration::from_millis(300)), keep_alive_timeout: None, local_address: None, @@ -166,6 +168,21 @@ impl HttpConnector { self.local_address = addr; } + /// Set timeout for each attempt to connect to an IP address. + /// + /// If the hostname resolves to multiple IP addresses then this timeout is + /// applied to each individual connection attempt, ensuring that all the + /// addresses are given equal opportunity to respond. + /// + /// If `None`, then no timeout is applied by the connector, making it + /// subject to the timeout imposed by the operating system. + /// + /// Default is `None`. + #[inline] + pub fn set_connect_timeout(&mut self, dur: Option) { + self.connect_timeout = dur; + } + /// Set timeout for [RFC 6555 (Happy Eyeballs)][RFC 6555] algorithm. /// /// If hostname resolves to both IPv4 and IPv6 addresses and connection @@ -238,6 +255,7 @@ where HttpConnecting { state: State::Lazy(self.resolver.clone(), host.into(), self.local_address), handle: self.handle.clone(), + connect_timeout: self.connect_timeout, happy_eyeballs_timeout: self.happy_eyeballs_timeout, keep_alive_timeout: self.keep_alive_timeout, nodelay: self.nodelay, @@ -293,6 +311,7 @@ where let fut = HttpConnecting { state: State::Lazy(self.resolver.clone(), host.into(), self.local_address), handle: self.handle.clone(), + connect_timeout: self.connect_timeout, happy_eyeballs_timeout: self.happy_eyeballs_timeout, keep_alive_timeout: self.keep_alive_timeout, nodelay: self.nodelay, @@ -321,6 +340,7 @@ fn invalid_url(err: InvalidUrl, handle: &Option) -> HttpConn keep_alive_timeout: None, nodelay: false, port: 0, + connect_timeout: None, happy_eyeballs_timeout: None, reuse_address: false, send_buffer_size: None, @@ -355,6 +375,7 @@ impl StdError for InvalidUrl { pub struct HttpConnecting { state: State, handle: Option, + connect_timeout: Option, happy_eyeballs_timeout: Option, keep_alive_timeout: Option, nodelay: bool, @@ -387,7 +408,12 @@ where // skip resolving the dns and start connecting right away. if let Some(addrs) = dns::IpAddrs::try_parse(host, me.port) { state = State::Connecting(ConnectingTcp::new( - local_addr, addrs, me.happy_eyeballs_timeout, me.reuse_address)); + local_addr, + addrs, + me.connect_timeout, + me.happy_eyeballs_timeout, + me.reuse_address, + )); } else { let name = dns::Name::new(mem::replace(host, String::new())); state = State::Resolving(resolver.resolve(name), local_addr); @@ -401,8 +427,13 @@ where .collect(); let addrs = dns::IpAddrs::new(addrs); state = State::Connecting(ConnectingTcp::new( - local_addr, addrs, me.happy_eyeballs_timeout, me.reuse_address)); - }, + local_addr, + addrs, + me.connect_timeout, + me.happy_eyeballs_timeout, + me.reuse_address, + )); + } State::Connecting(ref mut c) => { let sock = ready!(c.poll(cx, &me.handle))?; @@ -445,6 +476,7 @@ struct ConnectingTcp { local_addr: Option, preferred: ConnectingTcpRemote, fallback: Option, + connect_timeout: Option, reuse_address: bool, } @@ -452,6 +484,7 @@ impl ConnectingTcp { fn new( local_addr: Option, remote_addrs: dns::IpAddrs, + connect_timeout: Option, fallback_timeout: Option, reuse_address: bool, ) -> ConnectingTcp { @@ -462,6 +495,7 @@ impl ConnectingTcp { local_addr, preferred: ConnectingTcpRemote::new(preferred_addrs), fallback: None, + connect_timeout, reuse_address, }; } @@ -473,6 +507,7 @@ impl ConnectingTcp { delay: tokio_timer::delay_for(fallback_timeout), remote: ConnectingTcpRemote::new(fallback_addrs), }), + connect_timeout, reuse_address, } } else { @@ -480,6 +515,7 @@ impl ConnectingTcp { local_addr, preferred: ConnectingTcpRemote::new(remote_addrs), fallback: None, + connect_timeout, reuse_address, } } @@ -512,6 +548,7 @@ impl ConnectingTcpRemote { cx: &mut task::Context<'_>, local_addr: &Option, handle: &Option, + connect_timeout: Option, reuse_address: bool, ) -> Poll> { let mut err = None; @@ -528,14 +565,20 @@ impl ConnectingTcpRemote { err = Some(e); if let Some(addr) = self.addrs.next() { debug!("connecting to {}", addr); - *current = connect(&addr, local_addr, handle, reuse_address)?; + *current = connect(&addr, local_addr, handle, connect_timeout, reuse_address)?; continue; } } } } else if let Some(addr) = self.addrs.next() { debug!("connecting to {}", addr); - self.current = Some(connect(&addr, local_addr, handle, reuse_address)?); + self.current = Some(connect( + &addr, + local_addr, + handle, + connect_timeout, + reuse_address, + )?); continue; } @@ -544,7 +587,13 @@ impl ConnectingTcpRemote { } } -fn connect(addr: &SocketAddr, local_addr: &Option, handle: &Option, reuse_address: bool) -> io::Result { +fn connect( + addr: &SocketAddr, + local_addr: &Option, + handle: &Option, + connect_timeout: Option, + reuse_address: bool, +) -> io::Result { let builder = match addr { &SocketAddr::V4(_) => TcpBuilder::new_v4()?, &SocketAddr::V6(_) => TcpBuilder::new_v6()?, @@ -579,7 +628,12 @@ fn connect(addr: &SocketAddr, local_addr: &Option, handle: &Option, handle: &Option, handle: &Option) -> Poll> { match self.fallback.take() { - None => self.preferred.poll(cx, &self.local_addr, handle, self.reuse_address), - Some(mut fallback) => match self.preferred.poll(cx, &self.local_addr, handle, self.reuse_address) { + None => self.preferred.poll( + cx, + &self.local_addr, + handle, + self.connect_timeout, + self.reuse_address, + ), + Some(mut fallback) => match self.preferred.poll( + cx, + &self.local_addr, + handle, + self.connect_timeout, + self.reuse_address, + ) { Poll::Ready(Ok(stream)) => { // Preferred successful - drop fallback. Poll::Ready(Ok(stream)) } Poll::Pending => match Pin::new(&mut fallback.delay).poll(cx) { - Poll::Ready(()) => match fallback.remote.poll(cx, &self.local_addr, handle, self.reuse_address) { + Poll::Ready(()) => match fallback.remote.poll( + cx, + &self.local_addr, + handle, + self.connect_timeout, + self.reuse_address, + ) { Poll::Ready(Ok(stream)) => { // Fallback successful - drop current preferred, // but keep fallback as new preferred. @@ -621,7 +693,13 @@ impl ConnectingTcp { Poll::Ready(Err(_)) => { // Preferred failed - use fallback as new preferred. self.preferred = fallback.remote; - self.preferred.poll(cx, &self.local_addr, handle, self.reuse_address) + self.preferred.poll( + cx, + &self.local_addr, + handle, + self.connect_timeout, + self.reuse_address, + ) } } } @@ -631,6 +709,7 @@ impl ConnectingTcp { #[cfg(test)] mod tests { use std::io; + use std::time::{Duration, Instant}; use tokio::runtime::current_thread::Runtime; use tokio_net::driver::Handle; @@ -689,13 +768,46 @@ mod tests { }); } + #[test] + fn test_connect_timeout() { + use std::io::ErrorKind; + + let mut rt = Runtime::new().unwrap(); + + // 240.0.0.1 is reserved in IPv4 and is effectively a black hole. + // 100::1 is an official IPv6 black hole but Travis CI has no IPv6. + let uri = "http://240.0.0.1/".parse().unwrap(); + let dst = Destination { + uri, + }; + + let mut connector = HttpConnector::new(); + let timeout = Duration::from_millis(1000); + connector.set_connect_timeout(Some(timeout)); + + let start = Instant::now(); + let res = rt.block_on(connector.connect(dst)); + let duration = start.elapsed(); + + match res { + Ok(_) => panic!("Request succeeded but should have timed out"), + Err(error) => if ErrorKind::TimedOut == error.kind() { + // Allow actual duration to be +/- 150ms off. + let allowance = Duration::from_millis(150); + assert!(duration >= timeout - allowance); + assert!(duration <= timeout + allowance); + } else { + panic!("{:?}", error); + }, + } + } + #[test] #[cfg_attr(not(feature = "__internal_happy_eyeballs_tests"), ignore)] fn client_happy_eyeballs() { use std::future::Future; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, TcpListener}; use std::task::Poll; - use std::time::{Duration, Instant}; use tokio::runtime::current_thread::Runtime; @@ -763,7 +875,7 @@ mod tests { } let addrs = hosts.iter().map(|host| (host.clone(), addr.port()).into()).collect(); - let connecting_tcp = ConnectingTcp::new(None, dns::IpAddrs::new(addrs), Some(fallback_timeout), false); + let connecting_tcp = ConnectingTcp::new(None, dns::IpAddrs::new(addrs), None, Some(fallback_timeout), false); let fut = ConnectingTcpFuture(connecting_tcp); let start = Instant::now();