Skip to content

Commit

Permalink
impl IntoWarpService for T: tower_service::Service<Request<Body>>
Browse files Browse the repository at this point in the history
add warp::serve_service which runs tower_service::Service<Request<Body>>
  • Loading branch information
jxs committed Nov 27, 2019
1 parent 2a38e96 commit 5ffec7d
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 38 deletions.
40 changes: 40 additions & 0 deletions examples/tower_service.rs
@@ -0,0 +1,40 @@
extern crate warp;
extern crate hyper;

use std::task::{Context, Poll};
use std::pin::Pin;
use std::future::Future;
use warp::{Response, Request};
use warp::reject::Reject;
use tower_service::Service;

#[derive(Clone)]
struct TowerService;

#[derive(Debug)]
struct ServiceError;
impl Reject for ServiceError {}

impl Service<Request> for TowerService {
type Response = Response;
type Error = ServiceError;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, _req: Request) -> Self::Future {
// Create the HTTP response
let resp = Response::new("hello world".into());

// Return the response as an immediate future
Box::pin(futures::future::ok(resp))
}
}

#[tokio::main]
async fn main() {

warp::serve_service(TowerService).run(([127, 0, 0, 1], 3030)).await;
}
7 changes: 3 additions & 4 deletions src/filter/mod.rs
Expand Up @@ -31,7 +31,7 @@ use self::recover::Recover;
use self::unify::Unify;
use self::untuple_one::UntupleOne;
pub(crate) use self::wrap::{Wrap, WrapSealed};
pub use service::FilteredService;
pub use service::{FilteredService, TowerService};

// A crate-private base trait, allowing the actual `filter` method to change
// signatures without it being a breaking change.
Expand Down Expand Up @@ -404,12 +404,11 @@ pub trait Filter: FilterBase {
/// # Example
///
/// ```
/// # extern crate warp;
/// # extern crate tower_service;
/// use warp::Filter;
/// use tower_service::Service as TowerService;
/// use warp::Request;
///
/// fn tower_service() -> impl TowerService {
/// fn tower_service() -> impl TowerService<Request> {
/// warp::any().map(|| "ok").into_service()
/// }
/// ```
Expand Down
63 changes: 50 additions & 13 deletions src/filter/service.rs
Expand Up @@ -72,31 +72,68 @@ where
}
}

impl<F> IntoWarpService for FilteredService<F>
#[derive(Copy, Clone, Debug)]
pub struct TowerService<F> {
pub(crate) service: F,
}

#[pin_project]
#[derive(Debug)]
pub struct TowerServiceFuture<F> {
#[pin]
pub(crate) future: F,
}

impl<S> IntoWarpService for S
where
F: Filter + Send + Sync + 'static,
F::Extract: Reply,
F::Error: IsReject,
S: tower_service::Service<crate::Request, Response = crate::Response> + Send + Sync + 'static,
S::Error: crate::reject::Reject,
S::Future: Send,
{
type Service = FilteredService<F>;
type Service = TowerService<S>;

#[inline]
fn into_warp_service(self) -> Self::Service {
self
TowerService{ service: self }
}
}

impl<F> IntoWarpService for F
impl<S> WarpService for TowerService<S>
where
F: Filter + Send + Sync + 'static,
F::Extract: Reply,
F::Error: IsReject,
S: tower_service::Service<crate::Request, Response = crate::Response> + Send + Sync + 'static,
S::Error: crate::reject::Reject,
S::Future: Send
{
type Reply = TowerServiceFuture<S::Future>;

fn call(&mut self, req: Request, _remote_addr: Option<SocketAddr>) -> Self::Reply {

TowerServiceFuture{ future: self.service.call(req) }
}
}

impl<S> Future for TowerServiceFuture<S>
where
S: TryFuture<Ok = crate::Response>,
S::Error: crate::reject::Reject,
{
type Service = FilteredService<F>;
type Output = Result<crate::reply::Response, std::convert::Infallible>;

#[inline]
fn into_warp_service(self) -> Self::Service {
FilteredService { filter: self }
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
debug_assert!(!route::is_set(), "nested route::set calls");

let pin = self.project();
let fut = pin.future;

match fut.try_poll(cx) {
Poll::Ready(Ok(ok)) => Poll::Ready(Ok(ok)),
Poll::Pending => Poll::Pending,
Poll::Ready(Err(err)) => {
log::debug!("rejected: {:?}", err);
Poll::Ready(Ok(crate::reject::custom(err).into_response()))
}
}
}
}

Expand Down
11 changes: 7 additions & 4 deletions src/lib.rs
Expand Up @@ -151,16 +151,19 @@ pub use self::redirect::redirect;
pub use self::reject::{reject, Rejection};
#[doc(hidden)]
pub use self::reply::{reply, Reply};
pub use self::server::{serve, Server};
pub use self::server::{serve, serve_service, Server};
pub use hyper::rt::spawn;
#[doc(hidden)]
pub use http;
pub use hyper::rt::spawn;

#[doc(hidden)]
pub use bytes::Buf;
#[doc(hidden)]
pub use futures::{Future, Sink, Stream};
#[doc(hidden)]

pub(crate) type Request = http::Request<hyper::Body>;
pub(crate) type Response = hyper::Response<hyper::Body>;
///type alias for a Hyper Request
pub type Request = http::Request<hyper::Body>;

///type alias for a Hyper Request
pub type Response = hyper::Response<hyper::Body>;
48 changes: 31 additions & 17 deletions src/server.rs
Expand Up @@ -4,7 +4,7 @@ use std::net::SocketAddr;
use std::path::Path;
use std::future::Future;

use futures::{future, FutureExt, TryFuture, TryStream, TryStreamExt};
use futures::{future, FutureExt, TryStream, TryStreamExt};
use hyper::server::conn::AddrIncoming;
use hyper::service::make_service_fn;
use hyper::{Server as HyperServer};
Expand All @@ -15,14 +15,27 @@ use crate::Request;
use crate::Reply;
use crate::reject::IsReject;

/// Create a `Server` with the provided filter
pub fn serve<F>(filter: F) -> Server<crate::filter::FilteredService<F>> where
F: crate::Filter + Send + Sync + 'static,
F::Extract: Reply,
F::Error: IsReject,
{
Server {
pipeline: false,
service: filter.into_service(),
}
}

/// Create a `Server` with the provided service.
pub fn serve<S>(service: S) -> Server<S>
where
S: IntoWarpService + 'static + Clone,
pub fn serve_service<S>(service: S) -> Server<crate::filter::TowerService<S>> where
S: tower_service::Service<crate::Request, Response = crate::Response> + Send + Sync + 'static,
S::Error: crate::reject::Reject,
S::Future: Send,
{
Server {
pipeline: false,
service,
service: service.into_warp_service(),
}
}

Expand Down Expand Up @@ -72,7 +85,7 @@ macro_rules! into_service {
make_service_fn(move |transport| {
let inner = inner.clone();
let remote_addr = Transport::remote_addr(transport);
future::ok::<_, hyper::Error>(HyperService{ filter: inner.into_warp_service() , remote_addr })
future::ok::<_, hyper::Error>(HyperService{ filter: inner , remote_addr })
})
}};
}
Expand Down Expand Up @@ -134,9 +147,8 @@ macro_rules! try_bind {

// ===== impl Server =====

impl<S> Server<S>
where
S: IntoWarpService + 'static + Send + Clone,
impl<S> Server<S> where
S: WarpService + 'static + Send + Clone,
{
/// Run this `Server` forever on the current thread.
pub async fn run(self, addr: impl Into<SocketAddr> + 'static) {
Expand Down Expand Up @@ -183,7 +195,8 @@ where
pub fn bind(
self,
addr: impl Into<SocketAddr> + 'static,
) -> impl Future<Output = ()> + 'static {
) -> impl Future<Output = ()> + 'static
{
let (_, fut) = self.bind_ephemeral(addr);
fut
}
Expand All @@ -196,7 +209,8 @@ where
pub async fn try_bind(
self,
addr: impl Into<SocketAddr> + 'static,
) {
)
{
let addr = addr.into();
let srv = match try_bind!(self, &addr) {
Ok((_, srv)) => srv,
Expand Down Expand Up @@ -224,7 +238,8 @@ where
pub fn bind_ephemeral(
self,
addr: impl Into<SocketAddr> + 'static,
) -> (SocketAddr, impl Future<Output = ()> + 'static) {
) -> (SocketAddr, impl Future<Output = ()> + 'static)
{
let (addr, srv) = bind!(self, addr);
let srv = srv.map(|result| {
if let Err(err) = result {
Expand Down Expand Up @@ -295,7 +310,8 @@ where
self,
addr: impl Into<SocketAddr> + 'static,
signal: impl Future<Output = ()> + Send + 'static,
) -> (SocketAddr, impl Future<Output = ()> + 'static) {
) -> (SocketAddr, impl Future<Output = ()> + 'static)
{
let (addr, srv) = bind!(self, addr);
let fut = srv
.with_graceful_shutdown(signal)
Expand Down Expand Up @@ -364,9 +380,8 @@ where
#[cfg(feature = "tls")]
impl<S> TlsServer<S>
where
S: IntoWarpService + 'static,
<<S::Service as WarpService>::Reply as TryFuture>::Ok: Reply + Send,
<<S::Service as WarpService>::Reply as TryFuture>::Error: IsReject + Send,
S: WarpService + 'static + Send + Clone,

{
/// Run this `TlsServer` forever on the current thread.
///
Expand Down Expand Up @@ -457,4 +472,3 @@ pub trait WarpService {
type Reply: Future<Output = Result<hyper::Response<hyper::Body>, std::convert::Infallible>> + Send;
fn call(&mut self, req: Request, remote_addr: Option<SocketAddr>) -> Self::Reply;
}

0 comments on commit 5ffec7d

Please sign in to comment.