diff --git a/Cargo.toml b/Cargo.toml index d22904631c..1ef743c477 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ futures-core-preview = { version = "=0.3.0-alpha.18" } futures-channel-preview = { version = "=0.3.0-alpha.18" } futures-util-preview = { version = "=0.3.0-alpha.18" } http = "0.1.15" -http-body = { git = "https://github.com/hyperium/http-body" } +http-body = "0.2.0-alpha.1" httparse = "1.0" h2 = { git = "https://github.com/hyperium/h2" } iovec = "0.1" @@ -38,6 +38,7 @@ pin-utils = "=0.1.0-alpha.4" time = "0.1" tokio = { version = "=0.2.0-alpha.4", optional = true, default-features = false, features = ["rt-full"] } tower-service = "=0.3.0-alpha.1" +tower-make = { version = "0.1.0-alpha.2", features = ['io'] } tokio-executor = { version = "=0.2.0-alpha.4", features = ["blocking"] } tokio-io = "=0.2.0-alpha.4" tokio-sync = "=0.2.0-alpha.4" diff --git a/examples/tower_client.rs b/examples/tower_client.rs new file mode 100644 index 0000000000..634099f86e --- /dev/null +++ b/examples/tower_client.rs @@ -0,0 +1,26 @@ + +use hyper::client::service::{Connect, Service, MakeService}; +use hyper::client::conn::Builder; +use hyper::client::connect::HttpConnector; +use hyper::{Body, Request}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + pretty_env_logger::init(); + + let mut mk_svc = Connect::new(HttpConnector::new(), Builder::new()); + + let uri = "http://127.0.0.1:8080".parse::()?; + + + let mut svc = mk_svc.make_service(uri.clone()).await?; + + let body = Body::empty(); + + let req = Request::get(uri).body(body)?; + let res = svc.call(req).await?; + + println!("RESPONSE={:?}", res); + + Ok(()) +} diff --git a/src/client/conn.rs b/src/client/conn.rs index a1603814e6..ef5db41f9e 100644 --- a/src/client/conn.rs +++ b/src/client/conn.rs @@ -14,6 +14,7 @@ use std::sync::Arc; use bytes::Bytes; use futures_util::future::{self, Either, FutureExt as _}; use tokio_io::{AsyncRead, AsyncWrite}; +use tower_service::Service; use crate::body::Payload; use crate::common::{Exec, Future, Pin, Poll, task}; @@ -242,18 +243,21 @@ where } } -/* TODO(0.12.0): when we change from tokio-service to tower. -impl Service for SendRequest { - type Request = Request; - type Response = Response; - type Error = ::Error; +impl Service> for SendRequest +where + B: Payload + 'static, { + type Response = Response; + type Error = crate::Error; type Future = ResponseFuture; - fn call(&self, req: Self::Request) -> Self::Future { + fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + self.poll_ready(cx) + } + fn call(&mut self, req: Request) -> Self::Future { + self.send_request(req) } } -*/ impl fmt::Debug for SendRequest { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { diff --git a/src/client/connect/http.rs b/src/client/connect/http.rs index 521b5be2a4..f54cd2e758 100644 --- a/src/client/connect/http.rs +++ b/src/client/connect/http.rs @@ -5,7 +5,8 @@ use std::mem; use std::net::{IpAddr, SocketAddr}; use std::time::Duration; -use http::uri::Scheme; +use http::uri::{Scheme, Uri}; +use futures_util::{TryFutureExt, FutureExt}; use net2::TcpBuilder; use tokio_net::driver::Handle; use tokio_net::tcp::TcpStream; @@ -248,6 +249,63 @@ where } } +impl tower_service::Service for HttpConnector +where + R: Resolve + Clone + Send + Sync + 'static, + R::Future: Send, +{ + type Response = TcpStream; + type Error = io::Error; + type Future = Pin> + Send + 'static>>; + + fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll> { + Ok(()).into() + } + + fn call(&mut self, dst: Uri) -> Self::Future { + // TODO: return error here + let dst = Destination::try_from_uri(dst).unwrap(); + + trace!( + "Http::connect; scheme={}, host={}, port={:?}", + dst.scheme(), + dst.host(), + dst.port(), + ); + + if self.enforce_http { + if dst.uri.scheme_part() != Some(&Scheme::HTTP) { + return invalid_url::(InvalidUrl::NotHttp, &self.handle).map_ok(|(s, _)| s).boxed(); + } + } else if dst.uri.scheme_part().is_none() { + return invalid_url::(InvalidUrl::MissingScheme, &self.handle).map_ok(|(s, _)| s).boxed(); + } + + let host = match dst.uri.host() { + Some(s) => s, + None => return invalid_url::(InvalidUrl::MissingAuthority, &self.handle).map_ok(|(s, _)| s).boxed(), + }; + let port = match dst.uri.port_part() { + Some(port) => port.as_u16(), + None => if dst.uri.scheme_part() == Some(&Scheme::HTTPS) { 443 } else { 80 }, + }; + + let fut = HttpConnecting { + state: State::Lazy(self.resolver.clone(), host.into(), self.local_address), + handle: self.handle.clone(), + happy_eyeballs_timeout: self.happy_eyeballs_timeout, + keep_alive_timeout: self.keep_alive_timeout, + nodelay: self.nodelay, + port, + reuse_address: self.reuse_address, + send_buffer_size: self.send_buffer_size, + recv_buffer_size: self.recv_buffer_size, + }; + + fut.map_ok(|(s, _)| s).boxed() + } +} + impl HttpInfo { /// Get the remote address of the transport used. pub fn remote_addr(&self) -> SocketAddr { diff --git a/src/client/mod.rs b/src/client/mod.rs index 2d92c1d260..5c976eee91 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -81,6 +81,7 @@ pub mod conn; pub mod connect; pub(crate) mod dispatch; mod pool; +pub mod service; #[cfg(test)] mod tests; diff --git a/src/client/service.rs b/src/client/service.rs new file mode 100644 index 0000000000..169f911415 --- /dev/null +++ b/src/client/service.rs @@ -0,0 +1,85 @@ +//! Utilities used to interact with the Tower ecosystem. +//! +//! This module provides exports of `Service`, `MakeService` and `Connect` which +//! all provide hook-ins into the Tower ecosystem. + +use super::conn::{SendRequest, Builder}; +use std::marker::PhantomData; +use crate::{common::{Poll, task, Pin}, body::Payload}; +use std::future::Future; +use std::error::Error as StdError; +use tower_make::MakeConnection; + +pub use tower_service::Service; +pub use tower_make::MakeService; + +/// Creates a connection via `SendRequest`. +/// +/// This accepts a `hyper::client::conn::Builder` and provides +/// a `MakeService` implementation to create connections from some +/// target `T`. +#[derive(Debug)] +pub struct Connect { + inner: C, + builder: Builder, + _pd: PhantomData +} + +impl Connect { + /// Create a new `Connect` with some inner connector `C` and a connection + /// builder. + pub fn new(inner: C, builder: Builder) -> Self { + Self { + inner, + builder, + _pd: PhantomData + } + } +} + +impl Service for Connect +where + C: MakeConnection, + C::Connection: Unpin + Send + 'static, + C::Future: Send + 'static, + C::Error: Into> + Send, + B: Payload + Unpin + 'static, + B::Data: Unpin, +{ + type Response = SendRequest; + type Error = crate::Error; + type Future = Pin> + Send + 'static>>; + + fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + self.inner.poll_ready(cx).map_err(|e| crate::Error::new(crate::error::Kind::Connect).with(e.into())) + } + + fn call(&mut self, req: T) -> Self::Future { + let builder = self.builder.clone(); + let io = self.inner.make_connection(req); + + let fut = async move { + match io.await { + Ok(io) => { + match builder.handshake(io).await { + Ok((sr, conn)) => { + builder.exec.execute(async move { + if let Err(e) = conn.await { + debug!("connection error: {:?}", e); + } + })?; + Ok(sr) + }, + Err(e) => Err(e) + } + }, + Err(e) => { + let err = crate::Error::new(crate::error::Kind::Connect).with(e.into()); + Err(err) + } + } + }; + + Box::pin(fut) + } +}