Skip to content

Commit

Permalink
feat(client): provide tower::Service support for clients (#1915)
Browse files Browse the repository at this point in the history
  • Loading branch information
LucioFranco authored and seanmonstar committed Aug 30, 2019
1 parent eef407d commit eee2a72
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 9 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Expand Up @@ -27,7 +27,7 @@ futures-core-preview = { version = "=0.3.0-alpha.18" }
futures-channel-preview = { version = "=0.3.0-alpha.18" }
futures-util-preview = { version = "=0.3.0-alpha.18" }
http = "0.1.15"
http-body = { git = "https://github.com/hyperium/http-body" }
http-body = "0.2.0-alpha.1"
httparse = "1.0"
h2 = { git = "https://github.com/hyperium/h2" }
iovec = "0.1"
Expand All @@ -38,6 +38,7 @@ pin-utils = "=0.1.0-alpha.4"
time = "0.1"
tokio = { version = "=0.2.0-alpha.4", optional = true, default-features = false, features = ["rt-full"] }
tower-service = "=0.3.0-alpha.1"
tower-make = { version = "0.1.0-alpha.2", features = ['io'] }
tokio-executor = { version = "=0.2.0-alpha.4", features = ["blocking"] }
tokio-io = "=0.2.0-alpha.4"
tokio-sync = "=0.2.0-alpha.4"
Expand Down
26 changes: 26 additions & 0 deletions examples/tower_client.rs
@@ -0,0 +1,26 @@

use hyper::client::service::{Connect, Service, MakeService};
use hyper::client::conn::Builder;
use hyper::client::connect::HttpConnector;
use hyper::{Body, Request};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
pretty_env_logger::init();

let mut mk_svc = Connect::new(HttpConnector::new(), Builder::new());

let uri = "http://127.0.0.1:8080".parse::<http::Uri>()?;


let mut svc = mk_svc.make_service(uri.clone()).await?;

let body = Body::empty();

let req = Request::get(uri).body(body)?;
let res = svc.call(req).await?;

println!("RESPONSE={:?}", res);

Ok(())
}
18 changes: 11 additions & 7 deletions src/client/conn.rs
Expand Up @@ -14,6 +14,7 @@ use std::sync::Arc;
use bytes::Bytes;
use futures_util::future::{self, Either, FutureExt as _};
use tokio_io::{AsyncRead, AsyncWrite};
use tower_service::Service;

use crate::body::Payload;
use crate::common::{Exec, Future, Pin, Poll, task};
Expand Down Expand Up @@ -242,18 +243,21 @@ where
}
}

/* TODO(0.12.0): when we change from tokio-service to tower.
impl<T, B> Service for SendRequest<T, B> {
type Request = Request<B>;
type Response = Response;
type Error = ::Error;
impl<B> Service<Request<B>> for SendRequest<B>
where
B: Payload + 'static, {
type Response = Response<Body>;
type Error = crate::Error;
type Future = ResponseFuture;

fn call(&self, req: Self::Request) -> Self::Future {
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_ready(cx)
}

fn call(&mut self, req: Request<B>) -> Self::Future {
self.send_request(req)
}
}
*/

impl<B> fmt::Debug for SendRequest<B> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand Down
60 changes: 59 additions & 1 deletion src/client/connect/http.rs
Expand Up @@ -5,7 +5,8 @@ use std::mem;
use std::net::{IpAddr, SocketAddr};
use std::time::Duration;

use http::uri::Scheme;
use http::uri::{Scheme, Uri};
use futures_util::{TryFutureExt, FutureExt};
use net2::TcpBuilder;
use tokio_net::driver::Handle;
use tokio_net::tcp::TcpStream;
Expand Down Expand Up @@ -248,6 +249,63 @@ where
}
}

impl<R> tower_service::Service<Uri> for HttpConnector<R>
where
R: Resolve + Clone + Send + Sync + 'static,
R::Future: Send,
{
type Response = TcpStream;
type Error = io::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;

fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
Ok(()).into()
}

fn call(&mut self, dst: Uri) -> Self::Future {
// TODO: return error here
let dst = Destination::try_from_uri(dst).unwrap();

trace!(
"Http::connect; scheme={}, host={}, port={:?}",
dst.scheme(),
dst.host(),
dst.port(),
);

if self.enforce_http {
if dst.uri.scheme_part() != Some(&Scheme::HTTP) {
return invalid_url::<R>(InvalidUrl::NotHttp, &self.handle).map_ok(|(s, _)| s).boxed();
}
} else if dst.uri.scheme_part().is_none() {
return invalid_url::<R>(InvalidUrl::MissingScheme, &self.handle).map_ok(|(s, _)| s).boxed();
}

let host = match dst.uri.host() {
Some(s) => s,
None => return invalid_url::<R>(InvalidUrl::MissingAuthority, &self.handle).map_ok(|(s, _)| s).boxed(),
};
let port = match dst.uri.port_part() {
Some(port) => port.as_u16(),
None => if dst.uri.scheme_part() == Some(&Scheme::HTTPS) { 443 } else { 80 },
};

let fut = HttpConnecting {
state: State::Lazy(self.resolver.clone(), host.into(), self.local_address),
handle: self.handle.clone(),
happy_eyeballs_timeout: self.happy_eyeballs_timeout,
keep_alive_timeout: self.keep_alive_timeout,
nodelay: self.nodelay,
port,
reuse_address: self.reuse_address,
send_buffer_size: self.send_buffer_size,
recv_buffer_size: self.recv_buffer_size,
};

fut.map_ok(|(s, _)| s).boxed()
}
}

impl HttpInfo {
/// Get the remote address of the transport used.
pub fn remote_addr(&self) -> SocketAddr {
Expand Down
1 change: 1 addition & 0 deletions src/client/mod.rs
Expand Up @@ -81,6 +81,7 @@ pub mod conn;
pub mod connect;
pub(crate) mod dispatch;
mod pool;
pub mod service;
#[cfg(test)]
mod tests;

Expand Down
85 changes: 85 additions & 0 deletions src/client/service.rs
@@ -0,0 +1,85 @@
//! Utilities used to interact with the Tower ecosystem.
//!
//! This module provides exports of `Service`, `MakeService` and `Connect` which
//! all provide hook-ins into the Tower ecosystem.

use super::conn::{SendRequest, Builder};
use std::marker::PhantomData;
use crate::{common::{Poll, task, Pin}, body::Payload};
use std::future::Future;
use std::error::Error as StdError;
use tower_make::MakeConnection;

pub use tower_service::Service;
pub use tower_make::MakeService;

/// Creates a connection via `SendRequest`.
///
/// This accepts a `hyper::client::conn::Builder` and provides
/// a `MakeService` implementation to create connections from some
/// target `T`.
#[derive(Debug)]
pub struct Connect<C, B, T> {
inner: C,
builder: Builder,
_pd: PhantomData<fn(T, B)>
}

impl<C, B, T> Connect<C, B, T> {
/// Create a new `Connect` with some inner connector `C` and a connection
/// builder.
pub fn new(inner: C, builder: Builder) -> Self {
Self {
inner,
builder,
_pd: PhantomData
}
}
}

impl<C, B, T> Service<T> for Connect<C, B, T>
where
C: MakeConnection<T>,
C::Connection: Unpin + Send + 'static,
C::Future: Send + 'static,
C::Error: Into<Box<dyn StdError + Send + Sync>> + Send,
B: Payload + Unpin + 'static,
B::Data: Unpin,
{
type Response = SendRequest<B>;
type Error = crate::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;

fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx).map_err(|e| crate::Error::new(crate::error::Kind::Connect).with(e.into()))
}

fn call(&mut self, req: T) -> Self::Future {
let builder = self.builder.clone();
let io = self.inner.make_connection(req);

let fut = async move {
match io.await {
Ok(io) => {
match builder.handshake(io).await {
Ok((sr, conn)) => {
builder.exec.execute(async move {
if let Err(e) = conn.await {
debug!("connection error: {:?}", e);
}
})?;
Ok(sr)
},
Err(e) => Err(e)
}
},
Err(e) => {
let err = crate::Error::new(crate::error::Kind::Connect).with(e.into());
Err(err)
}
}
};

Box::pin(fut)
}
}

0 comments on commit eee2a72

Please sign in to comment.