Skip to content

Commit

Permalink
feat(client): change Resolve to be Service<Name>
Browse files Browse the repository at this point in the history
Closes #1903

BREAKING CHANGE: The `Resolve` trait is gone. All custom resolves should
  implement `tower::Service` instead.

  The error type of `HttpConnector` has been changed away from
  `std::io::Error`.
  • Loading branch information
seanmonstar committed Nov 12, 2019
1 parent 039281b commit 9d9233c
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 130 deletions.
131 changes: 73 additions & 58 deletions src/client/connect/dns.rs
@@ -1,11 +1,26 @@
//! The `Resolve` trait, support types, and some basic implementations.
//! DNS Resolution used by the `HttpConnector`.
//!
//! This module contains:
//!
//! - A [`GaiResolver`](dns::GaiResolver) that is the default resolver for the
//! `HttpConnector`.
//! - The [`Resolve`](dns::Resolve) trait and related types to build a custom
//! resolver for use with the `HttpConnector`.
//! - The `Name` type used as an argument to custom resolvers.
//!
//! # Resolvers are `Service`s
//!
//! A resolver is just a
//! `Service<Name, Response = impl Iterator<Item = IpAddr>>`.
//!
//! A simple resolver that ignores the name and always returns a specific
//! address:
//!
//! ```rust,ignore
//! use std::{convert::Infallible, iter, net::IpAddr};
//!
//! let resolver = tower::service_fn(|_name| async {
//! Ok::<_, Infallible>(iter::once(IpAddr::from([127, 0, 0, 1])))
//! });
//! ```
use std::{fmt, io, vec};
use std::error::Error;
use std::net::{
Expand All @@ -15,19 +30,10 @@ use std::net::{
};
use std::str::FromStr;

use tokio_sync::{mpsc, oneshot};
use tower_service::Service;
use crate::common::{Future, Pin, Poll, task};

use crate::common::{Future, Never, Pin, Poll, task};

/// Resolve a hostname to a set of IP addresses.
pub trait Resolve {
/// The set of IP addresses to try to connect to.
type Addrs: Iterator<Item=IpAddr>;
/// A Future of the resolved set of addresses.
type Future: Future<Output=Result<Self::Addrs, io::Error>>;
/// Resolve a hostname.
fn resolve(&self, name: Name) -> Self::Future;
}
pub(super) use self::sealed::Resolve;

/// A domain name to resolve into IP addresses.
#[derive(Clone, Hash, Eq, PartialEq)]
Expand All @@ -41,15 +47,12 @@ pub struct GaiResolver {
_priv: (),
}

#[derive(Clone)]
struct ThreadPoolKeepAlive(mpsc::Sender<Never>);

/// An iterator of IP addresses returned from `getaddrinfo`.
pub struct GaiAddrs {
inner: IpAddrs,
}

/// A future to resole a name returned by `GaiResolver`.
/// A future to resolve a name returned by `GaiResolver`.
pub struct GaiFuture {
inner: tokio_executor::blocking::Blocking<Result<IpAddrs, io::Error>>,
}
Expand Down Expand Up @@ -110,11 +113,16 @@ impl GaiResolver {
}
}

impl Resolve for GaiResolver {
type Addrs = GaiAddrs;
impl Service<Name> for GaiResolver {
type Response = GaiAddrs;
type Error = io::Error;
type Future = GaiFuture;

fn resolve(&self, name: Name) -> Self::Future {
fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, name: Name) -> Self::Future {
let blocking = tokio_executor::blocking::run(move || {
debug!("resolving host={:?}", name.host);
(&*name.host, 0).to_socket_addrs()
Expand Down Expand Up @@ -164,39 +172,6 @@ impl fmt::Debug for GaiAddrs {
}
}


pub(super) struct GaiBlocking {
host: String,
tx: Option<oneshot::Sender<io::Result<IpAddrs>>>,
}

impl GaiBlocking {
fn block(&self) -> io::Result<IpAddrs> {
debug!("resolving host={:?}", self.host);
(&*self.host, 0).to_socket_addrs()
.map(|i| IpAddrs { iter: i })

}
}

impl Future for GaiBlocking {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
if self.tx.as_mut().expect("polled after complete").poll_closed(cx).is_ready() {
trace!("resolve future canceled for {:?}", self.host);
return Poll::Ready(());
}

let res = self.block();

let tx = self.tx.take().expect("polled after complete");
let _ = tx.send(res);

Poll::Ready(())
}
}

pub(super) struct IpAddrs {
iter: vec::IntoIter<SocketAddr>,
}
Expand Down Expand Up @@ -276,11 +251,16 @@ impl TokioThreadpoolGaiResolver {
}

#[cfg(feature = "runtime")]
impl Resolve for TokioThreadpoolGaiResolver {
type Addrs = GaiAddrs;
impl Service<Name> for TokioThreadpoolGaiResolver {
type Response = GaiAddrs;
type Error = io::Error;
type Future = TokioThreadpoolGaiFuture;

fn resolve(&self, name: Name) -> TokioThreadpoolGaiFuture {
fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, name: Name) -> Self::Future {
TokioThreadpoolGaiFuture { name }
}
}
Expand All @@ -299,6 +279,41 @@ impl Future for TokioThreadpoolGaiFuture {
}
}

mod sealed {
use tower_service::Service;
use crate::common::{Future, Poll, task};
use super::{IpAddr, Name};

// "Trait alias" for `Service<Name, Response = Addrs>`
pub trait Resolve {
type Addrs: Iterator<Item=IpAddr>;
type Error: Into<Box<dyn std::error::Error + Send + Sync>>;
type Future: Future<Output=Result<Self::Addrs, Self::Error>>;

fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>>;
fn resolve(&mut self, name: Name) -> Self::Future;
}

impl<S> Resolve for S
where
S: Service<Name>,
S::Response: Iterator<Item=IpAddr>,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
type Addrs = S::Response;
type Error = S::Error;
type Future = S::Future;

fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
Service::poll_ready(self, cx)
}

fn resolve(&mut self, name: Name) -> Self::Future {
Service::call(self, name)
}
}
}

#[cfg(test)]
mod tests {
use std::net::{Ipv4Addr, Ipv6Addr};
Expand Down

0 comments on commit 9d9233c

Please sign in to comment.