Skip to content

Commit

Permalink
- impl tower_service::Service for FilteredFuture
Browse files Browse the repository at this point in the history
- add Filter::into_service function that converts a Warp Filter into a
 tower_service::Service
- add example running a warp Filter on Hyper
  • Loading branch information
jxs committed Nov 26, 2019
1 parent ea36d96 commit 2a38e96
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 16 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ rustls = { version = "0.16", optional = true }
tungstenite = { default-features = false, version = "0.9", optional = true }
urlencoding = "1.0.0"
pin-project = "0.4.5"
tower-service = "0.3.0-alpha.2"

[dev-dependencies]
pretty_env_logger = "0.3"
Expand Down
26 changes: 26 additions & 0 deletions examples/hyper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
extern crate warp;
extern crate hyper;

use warp::Filter;
use hyper::Server;
use hyper::service::{make_service_fn, service_fn};
use tower_service::Service;

#[tokio::main]
async fn main() {
let addr = ([0, 0, 0, 0], 3030).into();

let routes = warp::any().map(|| "hello world");

let make_service = make_service_fn(move |_| {
let routes = routes.clone();
futures::future::ok::<_, hyper::Error>(service_fn(move |req| routes.into_service().call(req)))
});

let server = Server::bind(&addr)
.serve(make_service);

if let Err(e) = server.await {
eprintln!("server error: {}", e);
}
}
24 changes: 24 additions & 0 deletions src/filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use self::recover::Recover;
use self::unify::Unify;
use self::untuple_one::UntupleOne;
pub(crate) use self::wrap::{Wrap, WrapSealed};
pub use service::FilteredService;

// A crate-private base trait, allowing the actual `filter` method to change
// signatures without it being a breaking change.
Expand Down Expand Up @@ -397,6 +398,29 @@ pub trait Filter: FilterBase {
{
BoxedFilter::new(self)
}

/// Wrap the `Filter` so that it implements `tower_service::Service`
///
/// # Example
///
/// ```
/// # extern crate warp;
/// # extern crate tower_service;
/// use warp::Filter;
/// use tower_service::Service as TowerService;
///
/// fn tower_service() -> impl TowerService {
/// warp::any().map(|| "ok").into_service()
/// }
/// ```
fn into_service(self) -> FilteredService<Self>
where
Self: Sized + Send + Sync + 'static,
Self::Extract: crate::Reply,
Self::Error: IsReject,
{
FilteredService{ filter: self }
}
}

impl<T: FilterBase> Filter for T {}
Expand Down
28 changes: 26 additions & 2 deletions src/filter/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{Filter, Request};

#[derive(Copy, Clone, Debug)]
pub struct FilteredService<F> {
filter: F,
pub(crate) filter: F,
}

impl<F> WarpService for FilteredService<F>
Expand All @@ -26,7 +26,7 @@ where
type Reply = FilteredFuture<F::Future>;

#[inline]
fn call(&self, req: Request, remote_addr: Option<SocketAddr>) -> Self::Reply {
fn call(&mut self, req: Request, remote_addr: Option<SocketAddr>) -> Self::Reply {
debug_assert!(!route::is_set(), "nested route::set calls");

let route = Route::new(req, remote_addr);
Expand Down Expand Up @@ -99,3 +99,27 @@ where
FilteredService { filter: self }
}
}

impl<F> tower_service::Service<crate::Request> for FilteredService<F>
where
F: Filter,
<F::Future as TryFuture>::Ok: Reply,
<F::Future as TryFuture>::Error: IsReject,
{
type Response = crate::Response;
type Error = std::convert::Infallible;
type Future = FilteredFuture<F::Future>;

fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, req: crate::Request) -> Self::Future {
let route = Route::new(req, None);
let fut = route::set(&route, || self.filter.filter());
FilteredFuture {
future: fut,
route,
}
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,4 @@ pub use futures::{Future, Sink, Stream};
#[doc(hidden)]

pub(crate) type Request = http::Request<hyper::Body>;
pub(crate) type Response = hyper::Response<hyper::Body>;
42 changes: 30 additions & 12 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,23 @@ use std::error::Error as StdError;
use std::net::SocketAddr;
#[cfg(feature = "tls")]
use std::path::Path;
use std::sync::Arc;
use std::future::Future;

use futures::{future, FutureExt, TryFuture, TryStream, TryStreamExt};
use hyper::server::conn::AddrIncoming;
use hyper::service::{make_service_fn, service_fn};
use hyper::service::make_service_fn;
use hyper::{Server as HyperServer};
use tokio::io::{AsyncRead, AsyncWrite};

use crate::transport::Transport;
use crate::Request;
use crate::reply::Reply;
use crate::Reply;
use crate::reject::IsReject;

/// Create a `Server` with the provided service.
pub fn serve<S>(service: S) -> Server<S>
where
S: IntoWarpService + 'static,
S: IntoWarpService + 'static + Clone,
{
Server {
pipeline: false,
Expand All @@ -43,17 +42,37 @@ pub struct TlsServer<S> {
tls: ::rustls::ServerConfig,
}

pub struct HyperService<F> {
filter: F,
remote_addr: Option<SocketAddr>,
}

impl<F> tower_service::Service<crate::Request> for HyperService<F>
where
F: WarpService,
{
type Response = crate::Response;
type Error = std::convert::Infallible;
type Future = F::Reply;

fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
std::task::Poll::Ready(Ok(()))
}

fn call(&mut self, req: crate::Request) -> Self::Future {
self.filter.call(req, self.remote_addr)
}
}

// Getting all various generic bounds to make this a re-usable method is
// very complicated, so instead this is just a macro.
macro_rules! into_service {
($into:expr) => {{
let inner = Arc::new($into.into_warp_service());
let inner = $into;
make_service_fn(move |transport| {
let inner = inner.clone();
let remote_addr = Transport::remote_addr(transport);
future::ok::<_, hyper::Error>(service_fn(move |req|
inner.call(req, remote_addr),
))
future::ok::<_, hyper::Error>(HyperService{ filter: inner.into_warp_service() , remote_addr })
})
}};
}
Expand Down Expand Up @@ -117,9 +136,7 @@ macro_rules! try_bind {

impl<S> Server<S>
where
S: IntoWarpService + 'static + Send,
<<S::Service as WarpService>::Reply as TryFuture>::Ok: Reply + Send,
<<S::Service as WarpService>::Reply as TryFuture>::Error: IsReject + Send,
S: IntoWarpService + 'static + Send + Clone,
{
/// Run this `Server` forever on the current thread.
pub async fn run(self, addr: impl Into<SocketAddr> + 'static) {
Expand Down Expand Up @@ -438,5 +455,6 @@ pub trait IntoWarpService {

pub trait WarpService {
type Reply: Future<Output = Result<hyper::Response<hyper::Body>, std::convert::Infallible>> + Send;
fn call(&self, req: Request, remote_addr: Option<SocketAddr>) -> Self::Reply;
fn call(&mut self, req: Request, remote_addr: Option<SocketAddr>) -> Self::Reply;
}

2 changes: 1 addition & 1 deletion src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ impl WsBuilder {
/// ```
pub async fn handshake<F>(self, f: F) -> Result<WsClient, WsError>
where
F: Filter + Send + Sync + 'static,
F: Filter + Send + Sync + 'static + Clone,
F::Extract: Reply + Send,
F::Error: IsReject + Send,
{
Expand Down
2 changes: 1 addition & 1 deletion tests/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ async fn limit_message_size() {
assert!(client.recv().await.is_err());
}

fn ws_echo() -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> {
fn ws_echo() -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
warp::ws().map(|ws: warp::ws::Ws| {
ws.on_upgrade(|websocket| {
// Just echo all messages back...
Expand Down

0 comments on commit 2a38e96

Please sign in to comment.