Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Draft] tower_service::Service <-> warp::Filter interop #322

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Expand Up @@ -40,6 +40,7 @@ rustls = { version = "0.16", optional = true }
tungstenite = { default-features = false, version = "0.9", optional = true }
urlencoding = "1.0.0"
pin-project = "0.4.5"
tower-service = "0.3.0-alpha.2"

[dev-dependencies]
pretty_env_logger = "0.3"
Expand Down
26 changes: 26 additions & 0 deletions examples/hyper.rs
@@ -0,0 +1,26 @@
extern crate warp;
extern crate hyper;

use warp::Filter;
use hyper::Server;
use hyper::service::{make_service_fn, service_fn};
use tower_service::Service;

#[tokio::main]
async fn main() {
let addr = ([0, 0, 0, 0], 3030).into();

let routes = warp::any().map(|| "hello world");

let make_service = make_service_fn(move |_| {
let routes = routes.clone();
futures::future::ok::<_, hyper::Error>(service_fn(move |req| routes.into_service().call(req)))
});

let server = Server::bind(&addr)
.serve(make_service);

if let Err(e) = server.await {
eprintln!("server error: {}", e);
}
}
40 changes: 40 additions & 0 deletions examples/tower_service.rs
@@ -0,0 +1,40 @@
extern crate warp;
extern crate hyper;

use std::task::{Context, Poll};
use std::pin::Pin;
use std::future::Future;
use warp::{Response, Request};
use warp::reject::Reject;
use tower_service::Service;

#[derive(Clone)]
struct TowerService;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use service_fn as an example here


#[derive(Debug)]
struct ServiceError;
impl Reject for ServiceError {}

impl Service<Request> for TowerService {
type Response = Response;
type Error = ServiceError;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, _req: Request) -> Self::Future {
// Create the HTTP response
let resp = Response::new("hello world".into());

// Return the response as an immediate future
Box::pin(futures::future::ok(resp))
}
}

#[tokio::main]
async fn main() {

warp::serve_service(TowerService).run(([127, 0, 0, 1], 3030)).await;
}
23 changes: 23 additions & 0 deletions src/filter/mod.rs
Expand Up @@ -31,6 +31,7 @@ use self::recover::Recover;
use self::unify::Unify;
use self::untuple_one::UntupleOne;
pub(crate) use self::wrap::{Wrap, WrapSealed};
pub use service::{FilteredService, TowerService};

// A crate-private base trait, allowing the actual `filter` method to change
// signatures without it being a breaking change.
Expand Down Expand Up @@ -397,6 +398,28 @@ pub trait Filter: FilterBase {
{
BoxedFilter::new(self)
}

/// Wrap the `Filter` so that it implements `tower_service::Service`
///
/// # Example
///
/// ```
/// use warp::Filter;
/// use tower_service::Service as TowerService;
/// use warp::Request;
///
/// fn tower_service() -> impl TowerService<Request> {
/// warp::any().map(|| "ok").into_service()
/// }
/// ```
fn into_service(self) -> FilteredService<Self>
where
Self: Sized + Send + Sync + 'static,
Self::Extract: crate::Reply,
Self::Error: IsReject,
{
FilteredService{ filter: self }
}
}

impl<T: FilterBase> Filter for T {}
Expand Down
105 changes: 88 additions & 17 deletions src/filter/service.rs
Expand Up @@ -14,7 +14,7 @@ use crate::{Filter, Request};

#[derive(Copy, Clone, Debug)]
pub struct FilteredService<F> {
filter: F,
pub(crate) filter: F,
}

impl<F> WarpService for FilteredService<F>
Expand All @@ -26,7 +26,7 @@ where
type Reply = FilteredFuture<F::Future>;

#[inline]
fn call(&self, req: Request, remote_addr: Option<SocketAddr>) -> Self::Reply {
fn call(&mut self, req: Request, remote_addr: Option<SocketAddr>) -> Self::Reply {
debug_assert!(!route::is_set(), "nested route::set calls");

let route = Route::new(req, remote_addr);
Expand All @@ -49,43 +49,114 @@ pub struct FilteredFuture<F> {
impl<F> Future for FilteredFuture<F>
where
F: TryFuture,
F::Ok: Reply,
F::Error: IsReject,
{
type Output = Result<F::Ok, F::Error>;
type Output = Result<crate::reply::Response, std::convert::Infallible>;

#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
debug_assert!(!route::is_set(), "nested route::set calls");

let pin = self.project();
let fut = pin.future;
route::set(&pin.route, || fut.try_poll(cx))

match route::set(pin.route, || fut.try_poll(cx)) {
Poll::Ready(Ok(ok)) => Poll::Ready(Ok(ok.into_response())),
Poll::Pending => Poll::Pending,
Poll::Ready(Err(err)) => {
log::debug!("rejected: {:?}", err);
Poll::Ready(Ok(err.into_response()))
}
}
}
}

impl<F> IntoWarpService for FilteredService<F>
#[derive(Copy, Clone, Debug)]
pub struct TowerService<F> {
pub(crate) service: F,
}

#[pin_project]
#[derive(Debug)]
pub struct TowerServiceFuture<F> {
#[pin]
pub(crate) future: F,
}

impl<S> IntoWarpService for S
where
F: Filter + Send + Sync + 'static,
F::Extract: Reply,
F::Error: IsReject,
S: tower_service::Service<crate::Request, Response = crate::Response> + Send + Sync + 'static,
S::Error: crate::reject::Reject,
S::Future: Send,
{
type Service = FilteredService<F>;
type Service = TowerService<S>;

#[inline]
fn into_warp_service(self) -> Self::Service {
self
TowerService{ service: self }
}
}

impl<F> IntoWarpService for F
impl<S> WarpService for TowerService<S>
where
F: Filter + Send + Sync + 'static,
F::Extract: Reply,
F::Error: IsReject,
S: tower_service::Service<crate::Request, Response = crate::Response> + Send + Sync + 'static,
S::Error: crate::reject::Reject,
S::Future: Send
{
type Service = FilteredService<F>;
type Reply = TowerServiceFuture<S::Future>;

fn call(&mut self, req: Request, _remote_addr: Option<SocketAddr>) -> Self::Reply {

TowerServiceFuture{ future: self.service.call(req) }
}
}

impl<S> Future for TowerServiceFuture<S>
where
S: TryFuture<Ok = crate::Response>,
S::Error: crate::reject::Reject,
{
type Output = Result<crate::reply::Response, std::convert::Infallible>;

#[inline]
fn into_warp_service(self) -> Self::Service {
FilteredService { filter: self }
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
debug_assert!(!route::is_set(), "nested route::set calls");

let pin = self.project();
let fut = pin.future;

match fut.try_poll(cx) {
Poll::Ready(Ok(ok)) => Poll::Ready(Ok(ok)),
Poll::Pending => Poll::Pending,
Poll::Ready(Err(err)) => {
log::debug!("rejected: {:?}", err);
Poll::Ready(Ok(crate::reject::custom(err).into_response()))
}
}
}
}

impl<F> tower_service::Service<crate::Request> for FilteredService<F>
where
F: Filter,
<F::Future as TryFuture>::Ok: Reply,
<F::Future as TryFuture>::Error: IsReject,
{
type Response = crate::Response;
type Error = std::convert::Infallible;
type Future = FilteredFuture<F::Future>;

fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, req: crate::Request) -> Self::Future {
let route = Route::new(req, None);
let fut = route::set(&route, || self.filter.filter());
FilteredFuture {
future: fut,
route,
}
}
}
10 changes: 7 additions & 3 deletions src/lib.rs
Expand Up @@ -151,15 +151,19 @@ pub use self::redirect::redirect;
pub use self::reject::{reject, Rejection};
#[doc(hidden)]
pub use self::reply::{reply, Reply};
pub use self::server::{serve, Server};
pub use self::server::{serve, serve_service, Server};
pub use hyper::rt::spawn;
#[doc(hidden)]
pub use http;
pub use hyper::rt::spawn;

#[doc(hidden)]
pub use bytes::Buf;
#[doc(hidden)]
pub use futures::{Future, Sink, Stream};
#[doc(hidden)]

pub(crate) type Request = http::Request<hyper::Body>;
///type alias for a Hyper Request
pub type Request = http::Request<hyper::Body>;

///type alias for a Hyper Request
pub type Response = hyper::Response<hyper::Body>;