diff --git a/examples/tower_service.rs b/examples/tower_service.rs new file mode 100644 index 000000000..ecad3613e --- /dev/null +++ b/examples/tower_service.rs @@ -0,0 +1,40 @@ +extern crate warp; +extern crate hyper; + +use std::task::{Context, Poll}; +use std::pin::Pin; +use std::future::Future; +use warp::{Response, Request}; +use warp::reject::Reject; +use tower_service::Service; + +#[derive(Clone)] +struct TowerService; + +#[derive(Debug)] +struct ServiceError; +impl Reject for ServiceError {} + +impl Service for TowerService { + type Response = Response; + type Error = ServiceError; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, _req: Request) -> Self::Future { + // Create the HTTP response + let resp = Response::new("hello world".into()); + + // Return the response as an immediate future + Box::pin(futures::future::ok(resp)) + } +} + +#[tokio::main] +async fn main() { + + warp::serve_service(TowerService).run(([127, 0, 0, 1], 3030)).await; +} diff --git a/src/filter/mod.rs b/src/filter/mod.rs index 6779199cb..fc2394443 100644 --- a/src/filter/mod.rs +++ b/src/filter/mod.rs @@ -31,7 +31,7 @@ use self::recover::Recover; use self::unify::Unify; use self::untuple_one::UntupleOne; pub(crate) use self::wrap::{Wrap, WrapSealed}; -pub use service::FilteredService; +pub use service::{FilteredService, TowerService}; // A crate-private base trait, allowing the actual `filter` method to change // signatures without it being a breaking change. @@ -404,12 +404,11 @@ pub trait Filter: FilterBase { /// # Example /// /// ``` - /// # extern crate warp; - /// # extern crate tower_service; /// use warp::Filter; /// use tower_service::Service as TowerService; + /// use warp::Request; /// - /// fn tower_service() -> impl TowerService { + /// fn tower_service() -> impl TowerService { /// warp::any().map(|| "ok").into_service() /// } /// ``` diff --git a/src/filter/service.rs b/src/filter/service.rs index 83ddb3430..977d38a37 100644 --- a/src/filter/service.rs +++ b/src/filter/service.rs @@ -72,31 +72,68 @@ where } } -impl IntoWarpService for FilteredService +#[derive(Copy, Clone, Debug)] +pub struct TowerService { + pub(crate) service: F, +} + +#[pin_project] +#[derive(Debug)] +pub struct TowerServiceFuture { + #[pin] + pub(crate) future: F, +} + +impl IntoWarpService for S where - F: Filter + Send + Sync + 'static, - F::Extract: Reply, - F::Error: IsReject, + S: tower_service::Service + Send + Sync + 'static, + S::Error: crate::reject::Reject, + S::Future: Send, { - type Service = FilteredService; + type Service = TowerService; #[inline] fn into_warp_service(self) -> Self::Service { - self + TowerService{ service: self } } } -impl IntoWarpService for F +impl WarpService for TowerService where - F: Filter + Send + Sync + 'static, - F::Extract: Reply, - F::Error: IsReject, + S: tower_service::Service + Send + Sync + 'static, + S::Error: crate::reject::Reject, + S::Future: Send +{ + type Reply = TowerServiceFuture; + + fn call(&mut self, req: Request, _remote_addr: Option) -> Self::Reply { + + TowerServiceFuture{ future: self.service.call(req) } + } +} + +impl Future for TowerServiceFuture +where + S: TryFuture, + S::Error: crate::reject::Reject, { - type Service = FilteredService; + type Output = Result; #[inline] - fn into_warp_service(self) -> Self::Service { - FilteredService { filter: self } + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + debug_assert!(!route::is_set(), "nested route::set calls"); + + let pin = self.project(); + let fut = pin.future; + + match fut.try_poll(cx) { + Poll::Ready(Ok(ok)) => Poll::Ready(Ok(ok)), + Poll::Pending => Poll::Pending, + Poll::Ready(Err(err)) => { + log::debug!("rejected: {:?}", err); + Poll::Ready(Ok(crate::reject::custom(err).into_response())) + } + } } } diff --git a/src/lib.rs b/src/lib.rs index f33da73c7..4a292a9c8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -151,10 +151,10 @@ pub use self::redirect::redirect; pub use self::reject::{reject, Rejection}; #[doc(hidden)] pub use self::reply::{reply, Reply}; -pub use self::server::{serve, Server}; +pub use self::server::{serve, serve_service, Server}; +pub use hyper::rt::spawn; #[doc(hidden)] pub use http; -pub use hyper::rt::spawn; #[doc(hidden)] pub use bytes::Buf; @@ -162,5 +162,8 @@ pub use bytes::Buf; pub use futures::{Future, Sink, Stream}; #[doc(hidden)] -pub(crate) type Request = http::Request; -pub(crate) type Response = hyper::Response; +///type alias for a Hyper Request +pub type Request = http::Request; + +///type alias for a Hyper Request +pub type Response = hyper::Response; diff --git a/src/server.rs b/src/server.rs index 7e6a18ea4..33dafb8b7 100644 --- a/src/server.rs +++ b/src/server.rs @@ -4,7 +4,7 @@ use std::net::SocketAddr; use std::path::Path; use std::future::Future; -use futures::{future, FutureExt, TryFuture, TryStream, TryStreamExt}; +use futures::{future, FutureExt, TryStream, TryStreamExt}; use hyper::server::conn::AddrIncoming; use hyper::service::make_service_fn; use hyper::{Server as HyperServer}; @@ -15,14 +15,27 @@ use crate::Request; use crate::Reply; use crate::reject::IsReject; +/// Create a `Server` with the provided filter +pub fn serve(filter: F) -> Server> where + F: crate::Filter + Send + Sync + 'static, + F::Extract: Reply, + F::Error: IsReject, +{ + Server { + pipeline: false, + service: filter.into_service(), + } +} + /// Create a `Server` with the provided service. -pub fn serve(service: S) -> Server -where - S: IntoWarpService + 'static + Clone, +pub fn serve_service(service: S) -> Server> where + S: tower_service::Service + Send + Sync + 'static, + S::Error: crate::reject::Reject, + S::Future: Send, { Server { pipeline: false, - service, + service: service.into_warp_service(), } } @@ -72,7 +85,7 @@ macro_rules! into_service { make_service_fn(move |transport| { let inner = inner.clone(); let remote_addr = Transport::remote_addr(transport); - future::ok::<_, hyper::Error>(HyperService{ filter: inner.into_warp_service() , remote_addr }) + future::ok::<_, hyper::Error>(HyperService{ filter: inner , remote_addr }) }) }}; } @@ -134,9 +147,8 @@ macro_rules! try_bind { // ===== impl Server ===== -impl Server -where - S: IntoWarpService + 'static + Send + Clone, +impl Server where +S: WarpService + 'static + Send + Clone, { /// Run this `Server` forever on the current thread. pub async fn run(self, addr: impl Into + 'static) { @@ -183,7 +195,8 @@ where pub fn bind( self, addr: impl Into + 'static, - ) -> impl Future + 'static { + ) -> impl Future + 'static + { let (_, fut) = self.bind_ephemeral(addr); fut } @@ -196,7 +209,8 @@ where pub async fn try_bind( self, addr: impl Into + 'static, - ) { + ) + { let addr = addr.into(); let srv = match try_bind!(self, &addr) { Ok((_, srv)) => srv, @@ -224,7 +238,8 @@ where pub fn bind_ephemeral( self, addr: impl Into + 'static, - ) -> (SocketAddr, impl Future + 'static) { + ) -> (SocketAddr, impl Future + 'static) + { let (addr, srv) = bind!(self, addr); let srv = srv.map(|result| { if let Err(err) = result { @@ -295,7 +310,8 @@ where self, addr: impl Into + 'static, signal: impl Future + Send + 'static, - ) -> (SocketAddr, impl Future + 'static) { + ) -> (SocketAddr, impl Future + 'static) + { let (addr, srv) = bind!(self, addr); let fut = srv .with_graceful_shutdown(signal) @@ -364,9 +380,8 @@ where #[cfg(feature = "tls")] impl TlsServer where - S: IntoWarpService + 'static, - <::Reply as TryFuture>::Ok: Reply + Send, - <::Reply as TryFuture>::Error: IsReject + Send, + S: WarpService + 'static + Send + Clone, + { /// Run this `TlsServer` forever on the current thread. /// @@ -457,4 +472,3 @@ pub trait WarpService { type Reply: Future, std::convert::Infallible>> + Send; fn call(&mut self, req: Request, remote_addr: Option) -> Self::Reply; } -