From a80471ae6143771644e8fc2902bc8323737bb79f Mon Sep 17 00:00:00 2001 From: Keith Wansbrough Date: Thu, 26 Apr 2018 08:43:05 +0100 Subject: [PATCH] feat(client): support local bind for HttpConnector Add `set_local_address` to the `HttpConnector`. This configures the client to bind the socket to a local address of the host before it connects to the destination. This is useful on hosts which have multiple network interfaces, to ensure the request is issued over a specific interface. Closes #1498 --- Cargo.toml | 5 +++ src/client/connect.rs | 81 +++++++++++++++++++++++++++---------------- 2 files changed, 56 insertions(+), 30 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b263e34012..32a71602c4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,6 +57,11 @@ runtime = [ "tokio-tcp", ] +[[example]] +name = "bound_client" +path = "examples/bound_client.rs" +required-features = ["runtime"] + [[example]] name = "client" path = "examples/client.rs" diff --git a/src/client/connect.rs b/src/client/connect.rs index e371cca070..1e7b8cf3e5 100644 --- a/src/client/connect.rs +++ b/src/client/connect.rs @@ -125,10 +125,11 @@ impl Connected { mod http { use super::*; + use std::borrow::Cow; use std::fmt; use std::io; use std::mem; - use std::net::SocketAddr; + use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; use std::time::Duration; @@ -146,30 +147,35 @@ mod http { use self::http_connector::HttpConnectorBlockingTask; - fn connect(addr: &SocketAddr, handle: &Option) -> io::Result { - if let Some(ref handle) = *handle { - let builder = match addr { - &SocketAddr::V4(_) => TcpBuilder::new_v4()?, - &SocketAddr::V6(_) => TcpBuilder::new_v6()?, + fn connect(addr: &SocketAddr, local_addr: &Option, handle: &Option) -> io::Result { + let builder = match addr { + &SocketAddr::V4(_) => TcpBuilder::new_v4()?, + &SocketAddr::V6(_) => TcpBuilder::new_v6()?, + }; + + if let Some(ref local_addr) = *local_addr { + // Caller has requested this socket be bound before calling connect + builder.bind(SocketAddr::new(local_addr.clone(), 0))?; + } + else if cfg!(windows) { + // Windows requires a socket be bound before calling connect + let any: SocketAddr = match addr { + &SocketAddr::V4(_) => { + ([0, 0, 0, 0], 0).into() + }, + &SocketAddr::V6(_) => { + ([0, 0, 0, 0, 0, 0, 0, 0], 0).into() + } }; + builder.bind(any)?; + } - if cfg!(windows) { - // Windows requires a socket be bound before calling connect - let any: SocketAddr = match addr { - &SocketAddr::V4(_) => { - ([0, 0, 0, 0], 0).into() - }, - &SocketAddr::V6(_) => { - ([0, 0, 0, 0, 0, 0, 0, 0], 0).into() - } - }; - builder.bind(any)?; - } + let handle = match *handle { + Some(ref handle) => Cow::Borrowed(handle), + None => Cow::Owned(Handle::current()), + }; - Ok(TcpStream::connect_std(builder.to_tcp_stream()?, addr, handle)) - } else { - Ok(TcpStream::connect(addr)) - } + Ok(TcpStream::connect_std(builder.to_tcp_stream()?, addr, &handle)) } /// A connector for the `http` scheme. @@ -182,6 +188,7 @@ mod http { handle: Option, keep_alive_timeout: Option, nodelay: bool, + local_address: Option, } impl HttpConnector { @@ -218,6 +225,7 @@ mod http { handle, keep_alive_timeout: None, nodelay: false, + local_address: None, } } @@ -246,6 +254,16 @@ mod http { pub fn set_nodelay(&mut self, nodelay: bool) { self.nodelay = nodelay; } + + /// Set that all sockets are bound to the configured address before connection. + /// + /// If `None`, the sockets will not be bound. + /// + /// Default is `None`. + #[inline] + pub fn set_local_address(&mut self, addr: Option) { + self.local_address = addr; + } } impl fmt::Debug for HttpConnector { @@ -287,7 +305,7 @@ mod http { }; HttpConnecting { - state: State::Lazy(self.executor.clone(), host.into(), port), + state: State::Lazy(self.executor.clone(), host.into(), port, self.local_address), handle: self.handle.clone(), keep_alive_timeout: self.keep_alive_timeout, nodelay: self.nodelay, @@ -337,8 +355,8 @@ mod http { } enum State { - Lazy(HttpConnectExecutor, String, u16), - Resolving(oneshot::SpawnHandle), + Lazy(HttpConnectExecutor, String, u16, Option), + Resolving(oneshot::SpawnHandle, Option), Connecting(ConnectingTcp), Error(Option), } @@ -351,26 +369,28 @@ mod http { loop { let state; match self.state { - State::Lazy(ref executor, ref mut host, port) => { + State::Lazy(ref executor, ref mut host, port, local_addr) => { // If the host is already an IP addr (v4 or v6), // skip resolving the dns and start connecting right away. if let Some(addrs) = dns::IpAddrs::try_parse(host, port) { state = State::Connecting(ConnectingTcp { addrs: addrs, + local_addr: local_addr, current: None }) } else { let host = mem::replace(host, String::new()); let work = dns::Work::new(host, port); - state = State::Resolving(oneshot::spawn(work, executor)); + state = State::Resolving(oneshot::spawn(work, executor), local_addr); } }, - State::Resolving(ref mut future) => { + State::Resolving(ref mut future, local_addr) => { match try!(future.poll()) { Async::NotReady => return Ok(Async::NotReady), Async::Ready(addrs) => { state = State::Connecting(ConnectingTcp { addrs: addrs, + local_addr: local_addr, current: None, }) } @@ -402,6 +422,7 @@ mod http { struct ConnectingTcp { addrs: dns::IpAddrs, + local_addr: Option, current: Option, } @@ -418,14 +439,14 @@ mod http { err = Some(e); if let Some(addr) = self.addrs.next() { debug!("connecting to {}", addr); - *current = connect(&addr, handle)?; + *current = connect(&addr, &self.local_addr, handle)?; continue; } } } } else if let Some(addr) = self.addrs.next() { debug!("connecting to {}", addr); - self.current = Some(connect(&addr, handle)?); + self.current = Some(connect(&addr, &self.local_addr, handle)?); continue; }