From ea36d96eff8f3ab68dfb5b10fc3a3a6cd1603bf0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Sun, 24 Nov 2019 23:18:42 +0000 Subject: [PATCH 1/3] remove ReplyFuture, add it's behaviour on FilteredFuture --- src/filter/service.rs | 14 +++++++++++-- src/server.rs | 49 +++++++------------------------------------ 2 files changed, 19 insertions(+), 44 deletions(-) diff --git a/src/filter/service.rs b/src/filter/service.rs index c6680ce5c..3294e1711 100644 --- a/src/filter/service.rs +++ b/src/filter/service.rs @@ -49,8 +49,10 @@ pub struct FilteredFuture { impl Future for FilteredFuture where F: TryFuture, + F::Ok: Reply, + F::Error: IsReject, { - type Output = Result; + type Output = Result; #[inline] fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { @@ -58,7 +60,15 @@ where let pin = self.project(); let fut = pin.future; - route::set(&pin.route, || fut.try_poll(cx)) + + match route::set(pin.route, || fut.try_poll(cx)) { + Poll::Ready(Ok(ok)) => Poll::Ready(Ok(ok.into_response())), + Poll::Pending => Poll::Pending, + Poll::Ready(Err(err)) => { + log::debug!("rejected: {:?}", err); + Poll::Ready(Ok(err.into_response())) + } + } } } diff --git a/src/server.rs b/src/server.rs index e8f205971..cd506ebac 100644 --- a/src/server.rs +++ b/src/server.rs @@ -3,22 +3,18 @@ use std::net::SocketAddr; #[cfg(feature = "tls")] use std::path::Path; use std::sync::Arc; -use std::pin::Pin; -use std::task::{Context, Poll}; use std::future::Future; -use std::convert::Infallible; -use pin_project::pin_project; use futures::{future, FutureExt, TryFuture, TryStream, TryStreamExt}; use hyper::server::conn::AddrIncoming; use hyper::service::{make_service_fn, service_fn}; use hyper::{Server as HyperServer}; use tokio::io::{AsyncRead, AsyncWrite}; -use crate::reject::IsReject; -use crate::reply::Reply; use crate::transport::Transport; use crate::Request; +use crate::reply::Reply; +use crate::reject::IsReject; /// Create a `Server` with the provided service. pub fn serve(service: S) -> Server @@ -55,9 +51,9 @@ macro_rules! into_service { make_service_fn(move |transport| { let inner = inner.clone(); let remote_addr = Transport::remote_addr(transport); - future::ok::<_, hyper::Error>(service_fn(move |req| ReplyFuture { - inner: inner.call(req, remote_addr), - })) + future::ok::<_, hyper::Error>(service_fn(move |req| + inner.call(req, remote_addr), + )) }) }}; } @@ -121,7 +117,7 @@ macro_rules! try_bind { impl Server where - S: IntoWarpService + 'static, + S: IntoWarpService + 'static + Send, <::Reply as TryFuture>::Ok: Reply + Send, <::Reply as TryFuture>::Error: IsReject + Send, { @@ -441,37 +437,6 @@ pub trait IntoWarpService { } pub trait WarpService { - type Reply: TryFuture + Send; + type Reply: Future, std::convert::Infallible>> + Send; fn call(&self, req: Request, remote_addr: Option) -> Self::Reply; } - -// Optimizes better than using Future::then, since it doesn't -// have to return an IntoFuture. -#[pin_project] -#[derive(Debug)] -struct ReplyFuture { - #[pin] - inner: F, -} - -impl Future for ReplyFuture -where - F: TryFuture, - F::Ok: Reply, - F::Error: IsReject, -{ - type Output = Result; - - #[inline] - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let pin = self.project(); - match pin.inner.try_poll(cx) { - Poll::Ready(Ok(ok)) => Poll::Ready(Ok(ok.into_response())), - Poll::Pending => Poll::Pending, - Poll::Ready(Err(err)) => { - log::debug!("rejected: {:?}", err); - Poll::Ready(Ok(err.into_response())) - } - } - } -} From 2a38e962d64e5cd85cb56b9886f7943972c21765 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Sun, 24 Nov 2019 23:57:25 +0000 Subject: [PATCH 2/3] - impl tower_service::Service for FilteredFuture - add Filter::into_service function that converts a Warp Filter into a tower_service::Service - add example running a warp Filter on Hyper --- Cargo.toml | 1 + examples/hyper.rs | 26 ++++++++++++++++++++++++++ src/filter/mod.rs | 24 ++++++++++++++++++++++++ src/filter/service.rs | 28 ++++++++++++++++++++++++++-- src/lib.rs | 1 + src/server.rs | 42 ++++++++++++++++++++++++++++++------------ src/test.rs | 2 +- tests/ws.rs | 2 +- 8 files changed, 110 insertions(+), 16 deletions(-) create mode 100644 examples/hyper.rs diff --git a/Cargo.toml b/Cargo.toml index 394d09269..4df097562 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,7 @@ rustls = { version = "0.16", optional = true } tungstenite = { default-features = false, version = "0.9", optional = true } urlencoding = "1.0.0" pin-project = "0.4.5" +tower-service = "0.3.0-alpha.2" [dev-dependencies] pretty_env_logger = "0.3" diff --git a/examples/hyper.rs b/examples/hyper.rs new file mode 100644 index 000000000..bfc08d8e5 --- /dev/null +++ b/examples/hyper.rs @@ -0,0 +1,26 @@ +extern crate warp; +extern crate hyper; + +use warp::Filter; +use hyper::Server; +use hyper::service::{make_service_fn, service_fn}; +use tower_service::Service; + +#[tokio::main] +async fn main() { + let addr = ([0, 0, 0, 0], 3030).into(); + + let routes = warp::any().map(|| "hello world"); + + let make_service = make_service_fn(move |_| { + let routes = routes.clone(); + futures::future::ok::<_, hyper::Error>(service_fn(move |req| routes.into_service().call(req))) + }); + + let server = Server::bind(&addr) + .serve(make_service); + + if let Err(e) = server.await { + eprintln!("server error: {}", e); + } +} diff --git a/src/filter/mod.rs b/src/filter/mod.rs index a9751080b..6779199cb 100644 --- a/src/filter/mod.rs +++ b/src/filter/mod.rs @@ -31,6 +31,7 @@ use self::recover::Recover; use self::unify::Unify; use self::untuple_one::UntupleOne; pub(crate) use self::wrap::{Wrap, WrapSealed}; +pub use service::FilteredService; // A crate-private base trait, allowing the actual `filter` method to change // signatures without it being a breaking change. @@ -397,6 +398,29 @@ pub trait Filter: FilterBase { { BoxedFilter::new(self) } + + /// Wrap the `Filter` so that it implements `tower_service::Service` + /// + /// # Example + /// + /// ``` + /// # extern crate warp; + /// # extern crate tower_service; + /// use warp::Filter; + /// use tower_service::Service as TowerService; + /// + /// fn tower_service() -> impl TowerService { + /// warp::any().map(|| "ok").into_service() + /// } + /// ``` + fn into_service(self) -> FilteredService + where + Self: Sized + Send + Sync + 'static, + Self::Extract: crate::Reply, + Self::Error: IsReject, + { + FilteredService{ filter: self } + } } impl Filter for T {} diff --git a/src/filter/service.rs b/src/filter/service.rs index 3294e1711..83ddb3430 100644 --- a/src/filter/service.rs +++ b/src/filter/service.rs @@ -14,7 +14,7 @@ use crate::{Filter, Request}; #[derive(Copy, Clone, Debug)] pub struct FilteredService { - filter: F, + pub(crate) filter: F, } impl WarpService for FilteredService @@ -26,7 +26,7 @@ where type Reply = FilteredFuture; #[inline] - fn call(&self, req: Request, remote_addr: Option) -> Self::Reply { + fn call(&mut self, req: Request, remote_addr: Option) -> Self::Reply { debug_assert!(!route::is_set(), "nested route::set calls"); let route = Route::new(req, remote_addr); @@ -99,3 +99,27 @@ where FilteredService { filter: self } } } + +impl tower_service::Service for FilteredService +where + F: Filter, + ::Ok: Reply, + ::Error: IsReject, +{ + type Response = crate::Response; + type Error = std::convert::Infallible; + type Future = FilteredFuture; + + fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: crate::Request) -> Self::Future { + let route = Route::new(req, None); + let fut = route::set(&route, || self.filter.filter()); + FilteredFuture { + future: fut, + route, + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 4a00e7a57..f33da73c7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -163,3 +163,4 @@ pub use futures::{Future, Sink, Stream}; #[doc(hidden)] pub(crate) type Request = http::Request; +pub(crate) type Response = hyper::Response; diff --git a/src/server.rs b/src/server.rs index cd506ebac..7e6a18ea4 100644 --- a/src/server.rs +++ b/src/server.rs @@ -2,24 +2,23 @@ use std::error::Error as StdError; use std::net::SocketAddr; #[cfg(feature = "tls")] use std::path::Path; -use std::sync::Arc; use std::future::Future; use futures::{future, FutureExt, TryFuture, TryStream, TryStreamExt}; use hyper::server::conn::AddrIncoming; -use hyper::service::{make_service_fn, service_fn}; +use hyper::service::make_service_fn; use hyper::{Server as HyperServer}; use tokio::io::{AsyncRead, AsyncWrite}; use crate::transport::Transport; use crate::Request; -use crate::reply::Reply; +use crate::Reply; use crate::reject::IsReject; /// Create a `Server` with the provided service. pub fn serve(service: S) -> Server where - S: IntoWarpService + 'static, + S: IntoWarpService + 'static + Clone, { Server { pipeline: false, @@ -43,17 +42,37 @@ pub struct TlsServer { tls: ::rustls::ServerConfig, } +pub struct HyperService { + filter: F, + remote_addr: Option, +} + +impl tower_service::Service for HyperService +where + F: WarpService, +{ + type Response = crate::Response; + type Error = std::convert::Infallible; + type Future = F::Reply; + + fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + + fn call(&mut self, req: crate::Request) -> Self::Future { + self.filter.call(req, self.remote_addr) + } +} + // Getting all various generic bounds to make this a re-usable method is // very complicated, so instead this is just a macro. macro_rules! into_service { ($into:expr) => {{ - let inner = Arc::new($into.into_warp_service()); + let inner = $into; make_service_fn(move |transport| { let inner = inner.clone(); let remote_addr = Transport::remote_addr(transport); - future::ok::<_, hyper::Error>(service_fn(move |req| - inner.call(req, remote_addr), - )) + future::ok::<_, hyper::Error>(HyperService{ filter: inner.into_warp_service() , remote_addr }) }) }}; } @@ -117,9 +136,7 @@ macro_rules! try_bind { impl Server where - S: IntoWarpService + 'static + Send, - <::Reply as TryFuture>::Ok: Reply + Send, - <::Reply as TryFuture>::Error: IsReject + Send, + S: IntoWarpService + 'static + Send + Clone, { /// Run this `Server` forever on the current thread. pub async fn run(self, addr: impl Into + 'static) { @@ -438,5 +455,6 @@ pub trait IntoWarpService { pub trait WarpService { type Reply: Future, std::convert::Infallible>> + Send; - fn call(&self, req: Request, remote_addr: Option) -> Self::Reply; + fn call(&mut self, req: Request, remote_addr: Option) -> Self::Reply; } + diff --git a/src/test.rs b/src/test.rs index 92202a594..491b818f0 100644 --- a/src/test.rs +++ b/src/test.rs @@ -452,7 +452,7 @@ impl WsBuilder { /// ``` pub async fn handshake(self, f: F) -> Result where - F: Filter + Send + Sync + 'static, + F: Filter + Send + Sync + 'static + Clone, F::Extract: Reply + Send, F::Error: IsReject + Send, { diff --git a/tests/ws.rs b/tests/ws.rs index bc4268b62..d1357369c 100644 --- a/tests/ws.rs +++ b/tests/ws.rs @@ -130,7 +130,7 @@ async fn limit_message_size() { assert!(client.recv().await.is_err()); } -fn ws_echo() -> impl Filter { +fn ws_echo() -> impl Filter + Clone { warp::ws().map(|ws: warp::ws::Ws| { ws.on_upgrade(|websocket| { // Just echo all messages back... From 5ffec7ddbd011f9db1bf99c6b0a1ceba06e26ef9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Tue, 26 Nov 2019 18:32:39 +0000 Subject: [PATCH 3/3] impl IntoWarpService for T: tower_service::Service> add warp::serve_service which runs tower_service::Service> --- examples/tower_service.rs | 40 +++++++++++++++++++++++++ src/filter/mod.rs | 7 ++--- src/filter/service.rs | 63 +++++++++++++++++++++++++++++++-------- src/lib.rs | 11 ++++--- src/server.rs | 48 ++++++++++++++++++----------- 5 files changed, 131 insertions(+), 38 deletions(-) create mode 100644 examples/tower_service.rs diff --git a/examples/tower_service.rs b/examples/tower_service.rs new file mode 100644 index 000000000..ecad3613e --- /dev/null +++ b/examples/tower_service.rs @@ -0,0 +1,40 @@ +extern crate warp; +extern crate hyper; + +use std::task::{Context, Poll}; +use std::pin::Pin; +use std::future::Future; +use warp::{Response, Request}; +use warp::reject::Reject; +use tower_service::Service; + +#[derive(Clone)] +struct TowerService; + +#[derive(Debug)] +struct ServiceError; +impl Reject for ServiceError {} + +impl Service for TowerService { + type Response = Response; + type Error = ServiceError; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, _req: Request) -> Self::Future { + // Create the HTTP response + let resp = Response::new("hello world".into()); + + // Return the response as an immediate future + Box::pin(futures::future::ok(resp)) + } +} + +#[tokio::main] +async fn main() { + + warp::serve_service(TowerService).run(([127, 0, 0, 1], 3030)).await; +} diff --git a/src/filter/mod.rs b/src/filter/mod.rs index 6779199cb..fc2394443 100644 --- a/src/filter/mod.rs +++ b/src/filter/mod.rs @@ -31,7 +31,7 @@ use self::recover::Recover; use self::unify::Unify; use self::untuple_one::UntupleOne; pub(crate) use self::wrap::{Wrap, WrapSealed}; -pub use service::FilteredService; +pub use service::{FilteredService, TowerService}; // A crate-private base trait, allowing the actual `filter` method to change // signatures without it being a breaking change. @@ -404,12 +404,11 @@ pub trait Filter: FilterBase { /// # Example /// /// ``` - /// # extern crate warp; - /// # extern crate tower_service; /// use warp::Filter; /// use tower_service::Service as TowerService; + /// use warp::Request; /// - /// fn tower_service() -> impl TowerService { + /// fn tower_service() -> impl TowerService { /// warp::any().map(|| "ok").into_service() /// } /// ``` diff --git a/src/filter/service.rs b/src/filter/service.rs index 83ddb3430..977d38a37 100644 --- a/src/filter/service.rs +++ b/src/filter/service.rs @@ -72,31 +72,68 @@ where } } -impl IntoWarpService for FilteredService +#[derive(Copy, Clone, Debug)] +pub struct TowerService { + pub(crate) service: F, +} + +#[pin_project] +#[derive(Debug)] +pub struct TowerServiceFuture { + #[pin] + pub(crate) future: F, +} + +impl IntoWarpService for S where - F: Filter + Send + Sync + 'static, - F::Extract: Reply, - F::Error: IsReject, + S: tower_service::Service + Send + Sync + 'static, + S::Error: crate::reject::Reject, + S::Future: Send, { - type Service = FilteredService; + type Service = TowerService; #[inline] fn into_warp_service(self) -> Self::Service { - self + TowerService{ service: self } } } -impl IntoWarpService for F +impl WarpService for TowerService where - F: Filter + Send + Sync + 'static, - F::Extract: Reply, - F::Error: IsReject, + S: tower_service::Service + Send + Sync + 'static, + S::Error: crate::reject::Reject, + S::Future: Send +{ + type Reply = TowerServiceFuture; + + fn call(&mut self, req: Request, _remote_addr: Option) -> Self::Reply { + + TowerServiceFuture{ future: self.service.call(req) } + } +} + +impl Future for TowerServiceFuture +where + S: TryFuture, + S::Error: crate::reject::Reject, { - type Service = FilteredService; + type Output = Result; #[inline] - fn into_warp_service(self) -> Self::Service { - FilteredService { filter: self } + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + debug_assert!(!route::is_set(), "nested route::set calls"); + + let pin = self.project(); + let fut = pin.future; + + match fut.try_poll(cx) { + Poll::Ready(Ok(ok)) => Poll::Ready(Ok(ok)), + Poll::Pending => Poll::Pending, + Poll::Ready(Err(err)) => { + log::debug!("rejected: {:?}", err); + Poll::Ready(Ok(crate::reject::custom(err).into_response())) + } + } } } diff --git a/src/lib.rs b/src/lib.rs index f33da73c7..4a292a9c8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -151,10 +151,10 @@ pub use self::redirect::redirect; pub use self::reject::{reject, Rejection}; #[doc(hidden)] pub use self::reply::{reply, Reply}; -pub use self::server::{serve, Server}; +pub use self::server::{serve, serve_service, Server}; +pub use hyper::rt::spawn; #[doc(hidden)] pub use http; -pub use hyper::rt::spawn; #[doc(hidden)] pub use bytes::Buf; @@ -162,5 +162,8 @@ pub use bytes::Buf; pub use futures::{Future, Sink, Stream}; #[doc(hidden)] -pub(crate) type Request = http::Request; -pub(crate) type Response = hyper::Response; +///type alias for a Hyper Request +pub type Request = http::Request; + +///type alias for a Hyper Request +pub type Response = hyper::Response; diff --git a/src/server.rs b/src/server.rs index 7e6a18ea4..33dafb8b7 100644 --- a/src/server.rs +++ b/src/server.rs @@ -4,7 +4,7 @@ use std::net::SocketAddr; use std::path::Path; use std::future::Future; -use futures::{future, FutureExt, TryFuture, TryStream, TryStreamExt}; +use futures::{future, FutureExt, TryStream, TryStreamExt}; use hyper::server::conn::AddrIncoming; use hyper::service::make_service_fn; use hyper::{Server as HyperServer}; @@ -15,14 +15,27 @@ use crate::Request; use crate::Reply; use crate::reject::IsReject; +/// Create a `Server` with the provided filter +pub fn serve(filter: F) -> Server> where + F: crate::Filter + Send + Sync + 'static, + F::Extract: Reply, + F::Error: IsReject, +{ + Server { + pipeline: false, + service: filter.into_service(), + } +} + /// Create a `Server` with the provided service. -pub fn serve(service: S) -> Server -where - S: IntoWarpService + 'static + Clone, +pub fn serve_service(service: S) -> Server> where + S: tower_service::Service + Send + Sync + 'static, + S::Error: crate::reject::Reject, + S::Future: Send, { Server { pipeline: false, - service, + service: service.into_warp_service(), } } @@ -72,7 +85,7 @@ macro_rules! into_service { make_service_fn(move |transport| { let inner = inner.clone(); let remote_addr = Transport::remote_addr(transport); - future::ok::<_, hyper::Error>(HyperService{ filter: inner.into_warp_service() , remote_addr }) + future::ok::<_, hyper::Error>(HyperService{ filter: inner , remote_addr }) }) }}; } @@ -134,9 +147,8 @@ macro_rules! try_bind { // ===== impl Server ===== -impl Server -where - S: IntoWarpService + 'static + Send + Clone, +impl Server where +S: WarpService + 'static + Send + Clone, { /// Run this `Server` forever on the current thread. pub async fn run(self, addr: impl Into + 'static) { @@ -183,7 +195,8 @@ where pub fn bind( self, addr: impl Into + 'static, - ) -> impl Future + 'static { + ) -> impl Future + 'static + { let (_, fut) = self.bind_ephemeral(addr); fut } @@ -196,7 +209,8 @@ where pub async fn try_bind( self, addr: impl Into + 'static, - ) { + ) + { let addr = addr.into(); let srv = match try_bind!(self, &addr) { Ok((_, srv)) => srv, @@ -224,7 +238,8 @@ where pub fn bind_ephemeral( self, addr: impl Into + 'static, - ) -> (SocketAddr, impl Future + 'static) { + ) -> (SocketAddr, impl Future + 'static) + { let (addr, srv) = bind!(self, addr); let srv = srv.map(|result| { if let Err(err) = result { @@ -295,7 +310,8 @@ where self, addr: impl Into + 'static, signal: impl Future + Send + 'static, - ) -> (SocketAddr, impl Future + 'static) { + ) -> (SocketAddr, impl Future + 'static) + { let (addr, srv) = bind!(self, addr); let fut = srv .with_graceful_shutdown(signal) @@ -364,9 +380,8 @@ where #[cfg(feature = "tls")] impl TlsServer where - S: IntoWarpService + 'static, - <::Reply as TryFuture>::Ok: Reply + Send, - <::Reply as TryFuture>::Error: IsReject + Send, + S: WarpService + 'static + Send + Clone, + { /// Run this `TlsServer` forever on the current thread. /// @@ -457,4 +472,3 @@ pub trait WarpService { type Reply: Future, std::convert::Infallible>> + Send; fn call(&mut self, req: Request, remote_addr: Option) -> Self::Reply; } -