Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial push to add tower based clients #1915

Merged
merged 8 commits into from Aug 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Expand Up @@ -27,7 +27,7 @@ futures-core-preview = { version = "=0.3.0-alpha.18" }
futures-channel-preview = { version = "=0.3.0-alpha.18" }
futures-util-preview = { version = "=0.3.0-alpha.18" }
http = "0.1.15"
http-body = { git = "https://github.com/hyperium/http-body" }
http-body = "0.2.0-alpha.1"
httparse = "1.0"
h2 = { git = "https://github.com/hyperium/h2" }
iovec = "0.1"
Expand All @@ -38,6 +38,7 @@ pin-utils = "=0.1.0-alpha.4"
time = "0.1"
tokio = { version = "=0.2.0-alpha.4", optional = true, default-features = false, features = ["rt-full"] }
tower-service = "=0.3.0-alpha.1"
tower-make = { version = "0.1.0-alpha.2", features = ['io'] }
tokio-executor = { version = "=0.2.0-alpha.4", features = ["blocking"] }
tokio-io = "=0.2.0-alpha.4"
tokio-sync = "=0.2.0-alpha.4"
Expand Down
26 changes: 26 additions & 0 deletions examples/tower_client.rs
@@ -0,0 +1,26 @@

use hyper::client::service::{Connect, Service, MakeService};
use hyper::client::conn::Builder;
use hyper::client::connect::HttpConnector;
use hyper::{Body, Request};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
pretty_env_logger::init();

let mut mk_svc = Connect::new(HttpConnector::new(), Builder::new());

let uri = "http://127.0.0.1:8080".parse::<http::Uri>()?;


let mut svc = mk_svc.make_service(uri.clone()).await?;

let body = Body::empty();

let req = Request::get(uri).body(body)?;
let res = svc.call(req).await?;

println!("RESPONSE={:?}", res);

Ok(())
}
18 changes: 11 additions & 7 deletions src/client/conn.rs
Expand Up @@ -14,6 +14,7 @@ use std::sync::Arc;
use bytes::Bytes;
use futures_util::future::{self, Either, FutureExt as _};
use tokio_io::{AsyncRead, AsyncWrite};
use tower_service::Service;

use crate::body::Payload;
use crate::common::{Exec, Future, Pin, Poll, task};
Expand Down Expand Up @@ -242,18 +243,21 @@ where
}
}

/* TODO(0.12.0): when we change from tokio-service to tower.
impl<T, B> Service for SendRequest<T, B> {
type Request = Request<B>;
type Response = Response;
type Error = ::Error;
impl<B> Service<Request<B>> for SendRequest<B>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking we should rename this to Connection<B> and Background<T, B> for the old Connection.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm undecided. Is there an issue with the current naming?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that Service1<Service2<Connection<Request>>> seems much better than Service1<Service2<SendRequest<Request>>> thats really it.

where
B: Payload + 'static, {
type Response = Response<Body>;
type Error = crate::Error;
type Future = ResponseFuture;

fn call(&self, req: Self::Request) -> Self::Future {
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_ready(cx)
}

fn call(&mut self, req: Request<B>) -> Self::Future {
self.send_request(req)
}
}
*/

impl<B> fmt::Debug for SendRequest<B> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand Down
60 changes: 59 additions & 1 deletion src/client/connect/http.rs
Expand Up @@ -5,7 +5,8 @@ use std::mem;
use std::net::{IpAddr, SocketAddr};
use std::time::Duration;

use http::uri::Scheme;
use http::uri::{Scheme, Uri};
use futures_util::{TryFutureExt, FutureExt};
use net2::TcpBuilder;
use tokio_net::driver::Handle;
use tokio_net::tcp::TcpStream;
Expand Down Expand Up @@ -248,6 +249,63 @@ where
}
}

impl<R> tower_service::Service<Uri> for HttpConnector<R>
where
R: Resolve + Clone + Send + Sync + 'static,
R::Future: Send,
{
type Response = TcpStream;
type Error = io::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;

fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
Ok(()).into()
}

fn call(&mut self, dst: Uri) -> Self::Future {
// TODO: return error here
let dst = Destination::try_from_uri(dst).unwrap();

trace!(
"Http::connect; scheme={}, host={}, port={:?}",
dst.scheme(),
dst.host(),
dst.port(),
);

if self.enforce_http {
if dst.uri.scheme_part() != Some(&Scheme::HTTP) {
return invalid_url::<R>(InvalidUrl::NotHttp, &self.handle).map_ok(|(s, _)| s).boxed();
}
} else if dst.uri.scheme_part().is_none() {
return invalid_url::<R>(InvalidUrl::MissingScheme, &self.handle).map_ok(|(s, _)| s).boxed();
}

let host = match dst.uri.host() {
Some(s) => s,
None => return invalid_url::<R>(InvalidUrl::MissingAuthority, &self.handle).map_ok(|(s, _)| s).boxed(),
};
let port = match dst.uri.port_part() {
Some(port) => port.as_u16(),
None => if dst.uri.scheme_part() == Some(&Scheme::HTTPS) { 443 } else { 80 },
};

let fut = HttpConnecting {
state: State::Lazy(self.resolver.clone(), host.into(), self.local_address),
handle: self.handle.clone(),
happy_eyeballs_timeout: self.happy_eyeballs_timeout,
keep_alive_timeout: self.keep_alive_timeout,
nodelay: self.nodelay,
port,
reuse_address: self.reuse_address,
send_buffer_size: self.send_buffer_size,
recv_buffer_size: self.recv_buffer_size,
};

fut.map_ok(|(s, _)| s).boxed()
}
}

impl HttpInfo {
/// Get the remote address of the transport used.
pub fn remote_addr(&self) -> SocketAddr {
Expand Down
1 change: 1 addition & 0 deletions src/client/mod.rs
Expand Up @@ -81,6 +81,7 @@ pub mod conn;
pub mod connect;
pub(crate) mod dispatch;
mod pool;
pub mod service;
#[cfg(test)]
mod tests;

Expand Down
85 changes: 85 additions & 0 deletions src/client/service.rs
@@ -0,0 +1,85 @@
//! Utilities used to interact with the Tower ecosystem.
//!
//! This module provides exports of `Service`, `MakeService` and `Connect` which
//! all provide hook-ins into the Tower ecosystem.

use super::conn::{SendRequest, Builder};
use std::marker::PhantomData;
use crate::{common::{Poll, task, Pin}, body::Payload};
use std::future::Future;
use std::error::Error as StdError;
use tower_make::MakeConnection;

pub use tower_service::Service;
pub use tower_make::MakeService;

/// Creates a connection via `SendRequest`.
///
/// This accepts a `hyper::client::conn::Builder` and provides
/// a `MakeService` implementation to create connections from some
/// target `T`.
#[derive(Debug)]
pub struct Connect<C, B, T> {
LucioFranco marked this conversation as resolved.
Show resolved Hide resolved
inner: C,
builder: Builder,
_pd: PhantomData<fn(T, B)>
}

impl<C, B, T> Connect<C, B, T> {
/// Create a new `Connect` with some inner connector `C` and a connection
/// builder.
pub fn new(inner: C, builder: Builder) -> Self {
Self {
inner,
builder,
_pd: PhantomData
}
}
}

impl<C, B, T> Service<T> for Connect<C, B, T>
where
C: MakeConnection<T>,
C::Connection: Unpin + Send + 'static,
C::Future: Send + 'static,
C::Error: Into<Box<dyn StdError + Send + Sync>> + Send,
B: Payload + Unpin + 'static,
B::Data: Unpin,
{
type Response = SendRequest<B>;
type Error = crate::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;

fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx).map_err(|e| crate::Error::new(crate::error::Kind::Connect).with(e.into()))
}

fn call(&mut self, req: T) -> Self::Future {
let builder = self.builder.clone();
let io = self.inner.make_connection(req);

let fut = async move {
match io.await {
Ok(io) => {
match builder.handshake(io).await {
Ok((sr, conn)) => {
builder.exec.execute(async move {
if let Err(e) = conn.await {
debug!("connection error: {:?}", e);
}
})?;
Ok(sr)
},
Err(e) => Err(e)
}
},
Err(e) => {
let err = crate::Error::new(crate::error::Kind::Connect).with(e.into());
Err(err)
}
}
};

Box::pin(fut)
}
}