From a47b1f015e1c34a7ca4fdf836735116a0d6e60a3 Mon Sep 17 00:00:00 2001 From: James Le Cuirot Date: Fri, 27 Sep 2019 18:38:14 +0100 Subject: [PATCH] feat(client): add per IP address connection timeout It has previously been recommended to apply timeouts externally to hyper. The hyper-timeout crate uses a special connector to apply read and write timeouts but the connection timeout is still external. This approach has one major drawback that only an implementation within hyper can provide. If the requested hostname resolves to multiple IP addresses, hyper internally tries to connect to each one in turn. If one of them doesn't respond at all then it hangs until the OS-level timeout kicks in. This defaults to 127 seconds on Linux due to tcp_syn_retries being 6. hyper-timeout wraps around all the connection attempts rather than each one individually so setting that timeout to something shorter means that it will give up before trying the second address. You could reduce tcp_syn_retries but this is kernel-wide so this is far from ideal. Therefore add a set_connect_timeout function to HttpConnector. It defaults to None, preserving the earlier behaviour. Closes #1234 --- src/client/connect/http.rs | 140 +++++++++++++++++++++++++++++++++---- 1 file changed, 126 insertions(+), 14 deletions(-) diff --git a/src/client/connect/http.rs b/src/client/connect/http.rs index bb72f81d63..c2bf9b590f 100644 --- a/src/client/connect/http.rs +++ b/src/client/connect/http.rs @@ -10,7 +10,7 @@ use futures_util::{TryFutureExt, FutureExt}; use net2::TcpBuilder; use tokio_net::driver::Handle; use tokio_net::tcp::TcpStream; -use tokio_timer::Delay; +use tokio_timer::{Delay, Timeout}; use crate::common::{Future, Pin, Poll, task}; use super::{Connect, Connected, Destination}; @@ -31,6 +31,7 @@ type ConnectFuture = Pin> + Send>> pub struct HttpConnector { enforce_http: bool, handle: Option, + connect_timeout: Option, happy_eyeballs_timeout: Option, keep_alive_timeout: Option, local_address: Option, @@ -99,6 +100,7 @@ impl HttpConnector { HttpConnector { enforce_http: true, handle: None, + connect_timeout: None, happy_eyeballs_timeout: Some(Duration::from_millis(300)), keep_alive_timeout: None, local_address: None, @@ -166,6 +168,21 @@ impl HttpConnector { self.local_address = addr; } + /// Set timeout for each attempt to connect to an IP address. + /// + /// If the hostname resolves to multiple IP addresses then this timeout is + /// applied to each individual connection attempt, ensuring that all the + /// addresses are given equal opportunity to respond. + /// + /// If `None`, then no timeout is applied by the connector, making it + /// subject to the timeout imposed by the operating system. + /// + /// Default is `None`. + #[inline] + pub fn set_connect_timeout(&mut self, dur: Option) { + self.connect_timeout = dur; + } + /// Set timeout for [RFC 6555 (Happy Eyeballs)][RFC 6555] algorithm. /// /// If hostname resolves to both IPv4 and IPv6 addresses and connection @@ -238,6 +255,7 @@ where HttpConnecting { state: State::Lazy(self.resolver.clone(), host.into(), self.local_address), handle: self.handle.clone(), + connect_timeout: self.connect_timeout, happy_eyeballs_timeout: self.happy_eyeballs_timeout, keep_alive_timeout: self.keep_alive_timeout, nodelay: self.nodelay, @@ -293,6 +311,7 @@ where let fut = HttpConnecting { state: State::Lazy(self.resolver.clone(), host.into(), self.local_address), handle: self.handle.clone(), + connect_timeout: self.connect_timeout, happy_eyeballs_timeout: self.happy_eyeballs_timeout, keep_alive_timeout: self.keep_alive_timeout, nodelay: self.nodelay, @@ -321,6 +340,7 @@ fn invalid_url(err: InvalidUrl, handle: &Option) -> HttpConn keep_alive_timeout: None, nodelay: false, port: 0, + connect_timeout: None, happy_eyeballs_timeout: None, reuse_address: false, send_buffer_size: None, @@ -355,6 +375,7 @@ impl StdError for InvalidUrl { pub struct HttpConnecting { state: State, handle: Option, + connect_timeout: Option, happy_eyeballs_timeout: Option, keep_alive_timeout: Option, nodelay: bool, @@ -387,7 +408,12 @@ where // skip resolving the dns and start connecting right away. if let Some(addrs) = dns::IpAddrs::try_parse(host, me.port) { state = State::Connecting(ConnectingTcp::new( - local_addr, addrs, me.happy_eyeballs_timeout, me.reuse_address)); + local_addr, + addrs, + me.connect_timeout, + me.happy_eyeballs_timeout, + me.reuse_address, + )); } else { let name = dns::Name::new(mem::replace(host, String::new())); state = State::Resolving(resolver.resolve(name), local_addr); @@ -401,8 +427,13 @@ where .collect(); let addrs = dns::IpAddrs::new(addrs); state = State::Connecting(ConnectingTcp::new( - local_addr, addrs, me.happy_eyeballs_timeout, me.reuse_address)); - }, + local_addr, + addrs, + me.connect_timeout, + me.happy_eyeballs_timeout, + me.reuse_address, + )); + } State::Connecting(ref mut c) => { let sock = ready!(c.poll(cx, &me.handle))?; @@ -445,6 +476,7 @@ struct ConnectingTcp { local_addr: Option, preferred: ConnectingTcpRemote, fallback: Option, + connect_timeout: Option, reuse_address: bool, } @@ -452,6 +484,7 @@ impl ConnectingTcp { fn new( local_addr: Option, remote_addrs: dns::IpAddrs, + connect_timeout: Option, fallback_timeout: Option, reuse_address: bool, ) -> ConnectingTcp { @@ -462,6 +495,7 @@ impl ConnectingTcp { local_addr, preferred: ConnectingTcpRemote::new(preferred_addrs), fallback: None, + connect_timeout, reuse_address, }; } @@ -473,6 +507,7 @@ impl ConnectingTcp { delay: tokio_timer::delay_for(fallback_timeout), remote: ConnectingTcpRemote::new(fallback_addrs), }), + connect_timeout, reuse_address, } } else { @@ -480,6 +515,7 @@ impl ConnectingTcp { local_addr, preferred: ConnectingTcpRemote::new(remote_addrs), fallback: None, + connect_timeout, reuse_address, } } @@ -512,6 +548,7 @@ impl ConnectingTcpRemote { cx: &mut task::Context<'_>, local_addr: &Option, handle: &Option, + connect_timeout: Option, reuse_address: bool, ) -> Poll> { let mut err = None; @@ -528,14 +565,20 @@ impl ConnectingTcpRemote { err = Some(e); if let Some(addr) = self.addrs.next() { debug!("connecting to {}", addr); - *current = connect(&addr, local_addr, handle, reuse_address)?; + *current = connect(&addr, local_addr, handle, connect_timeout, reuse_address)?; continue; } } } } else if let Some(addr) = self.addrs.next() { debug!("connecting to {}", addr); - self.current = Some(connect(&addr, local_addr, handle, reuse_address)?); + self.current = Some(connect( + &addr, + local_addr, + handle, + connect_timeout, + reuse_address, + )?); continue; } @@ -544,7 +587,13 @@ impl ConnectingTcpRemote { } } -fn connect(addr: &SocketAddr, local_addr: &Option, handle: &Option, reuse_address: bool) -> io::Result { +fn connect( + addr: &SocketAddr, + local_addr: &Option, + handle: &Option, + connect_timeout: Option, + reuse_address: bool, +) -> io::Result { let builder = match addr { &SocketAddr::V4(_) => TcpBuilder::new_v4()?, &SocketAddr::V6(_) => TcpBuilder::new_v6()?, @@ -579,7 +628,12 @@ fn connect(addr: &SocketAddr, local_addr: &Option, handle: &Option, handle: &Option, handle: &Option) -> Poll> { match self.fallback.take() { - None => self.preferred.poll(cx, &self.local_addr, handle, self.reuse_address), - Some(mut fallback) => match self.preferred.poll(cx, &self.local_addr, handle, self.reuse_address) { + None => self.preferred.poll( + cx, + &self.local_addr, + handle, + self.connect_timeout, + self.reuse_address, + ), + Some(mut fallback) => match self.preferred.poll( + cx, + &self.local_addr, + handle, + self.connect_timeout, + self.reuse_address, + ) { Poll::Ready(Ok(stream)) => { // Preferred successful - drop fallback. Poll::Ready(Ok(stream)) } Poll::Pending => match Pin::new(&mut fallback.delay).poll(cx) { - Poll::Ready(()) => match fallback.remote.poll(cx, &self.local_addr, handle, self.reuse_address) { + Poll::Ready(()) => match fallback.remote.poll( + cx, + &self.local_addr, + handle, + self.connect_timeout, + self.reuse_address, + ) { Poll::Ready(Ok(stream)) => { // Fallback successful - drop current preferred, // but keep fallback as new preferred. @@ -621,7 +693,13 @@ impl ConnectingTcp { Poll::Ready(Err(_)) => { // Preferred failed - use fallback as new preferred. self.preferred = fallback.remote; - self.preferred.poll(cx, &self.local_addr, handle, self.reuse_address) + self.preferred.poll( + cx, + &self.local_addr, + handle, + self.connect_timeout, + self.reuse_address, + ) } } } @@ -631,6 +709,7 @@ impl ConnectingTcp { #[cfg(test)] mod tests { use std::io; + use std::time::{Duration, Instant}; use tokio::runtime::current_thread::Runtime; use tokio_net::driver::Handle; @@ -689,13 +768,46 @@ mod tests { }); } + #[test] + fn test_connect_timeout() { + use std::io::ErrorKind; + + let mut rt = Runtime::new().unwrap(); + + // 240.0.0.1 is reserved in IPv4 and is effectively a black hole. + // 100::1 is an official IPv6 black hole but Travis CI has no IPv6. + let uri = "http://240.0.0.1/".parse().unwrap(); + let dst = Destination { + uri, + }; + + let mut connector = HttpConnector::new(); + let timeout = Duration::from_millis(1000); + connector.set_connect_timeout(Some(timeout)); + + let start = Instant::now(); + let res = rt.block_on(connector.connect(dst)); + let duration = start.elapsed(); + + match res { + Ok(_) => panic!("Request succeeded but should have timed out"), + Err(error) => if ErrorKind::TimedOut == error.kind() { + // Allow actual duration to be +/- 150ms off. + let allowance = Duration::from_millis(150); + assert!(duration >= timeout - allowance); + assert!(duration <= timeout + allowance); + } else { + panic!("{:?}", error); + }, + } + } + #[test] #[cfg_attr(not(feature = "__internal_happy_eyeballs_tests"), ignore)] fn client_happy_eyeballs() { use std::future::Future; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, TcpListener}; use std::task::Poll; - use std::time::{Duration, Instant}; use tokio::runtime::current_thread::Runtime; @@ -763,7 +875,7 @@ mod tests { } let addrs = hosts.iter().map(|host| (host.clone(), addr.port()).into()).collect(); - let connecting_tcp = ConnectingTcp::new(None, dns::IpAddrs::new(addrs), Some(fallback_timeout), false); + let connecting_tcp = ConnectingTcp::new(None, dns::IpAddrs::new(addrs), None, Some(fallback_timeout), false); let fut = ConnectingTcpFuture(connecting_tcp); let start = Instant::now();