From 511eb697abd369de4171ffb31a3a549244892e2c Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Tue, 17 Apr 2018 13:03:59 -0700 Subject: [PATCH] feat(service): introduce hyper-specific `Service` This introduces the `hyper::service` module, which replaces `tokio-service`. Since the trait is specific to hyper, its associated types have been adjusted. It didn't make sense to need to define `Service`, since we already know the context is HTTP. Instead, the request and response bodies are associated types now, and slightly stricter bounds have been placed on `Error`. The helpers `service_fn` and `service_fn_ok` should be sufficient for now to ease creating `Service`s. The `NewService` trait now allows service creation to also be asynchronous. These traits are similar to `tower` in nature, and possibly will be replaced completely by it in the future. For now, hyper defining its own allows the traits to have better context, and prevents breaking changes in `tower` from affecting hyper. Closes #1461 BREAKING CHANGE: The `Service` trait has changed: it has some changed associated types, and `call` is now bound to `&mut self`. The `NewService` trait has changed: it has some changed associated types, and `new_service` now returns a `Future`. `Client` no longer implements `Service` for now. `hyper::server::conn::Serve` now returns `Connecting` instead of `Connection`s, since `new_service` can now return a `Future`. The `Connecting` is a future wrapping the new service future, returning a `Connection` afterwards. In many cases, `Future::flatten` can be used. --- Cargo.toml | 1 - examples/hello.rs | 18 ++-- examples/multi_server.rs | 35 +------ examples/params.rs | 129 ++++++++++++-------------- examples/send_file.rs | 184 ++++++++++++++++++------------------- examples/server.rs | 44 ++++----- examples/web_api.rs | 109 +++++++++++----------- src/client/mod.rs | 17 ---- src/common/mod.rs | 5 +- src/common/never.rs | 22 +++++ src/error.rs | 4 +- src/lib.rs | 2 +- src/proto/h1/dispatch.rs | 4 +- src/proto/h2/server.rs | 12 +-- src/server/conn.rs | 92 +++++++++++++------ src/server/mod.rs | 132 ++++++++++++++------------ src/server/service.rs | 64 ------------- src/service/mod.rs | 35 +++++++ src/service/new_service.rs | 55 +++++++++++ src/service/service.rs | 165 +++++++++++++++++++++++++++++++++ tests/server.rs | 80 ++++++---------- tests/support/mod.rs | 69 +++++++------- 22 files changed, 731 insertions(+), 547 deletions(-) create mode 100644 src/common/never.rs delete mode 100644 src/server/service.rs create mode 100644 src/service/mod.rs create mode 100644 src/service/new_service.rs create mode 100644 src/service/service.rs diff --git a/Cargo.toml b/Cargo.toml index 4d750d6630..90380e9fae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,7 +34,6 @@ net2 = "0.2.32" time = "0.1" tokio = "0.1.5" tokio-executor = "0.1.0" -tokio-service = "0.1" tokio-io = "0.1" want = "0.0.3" diff --git a/examples/hello.rs b/examples/hello.rs index f3c476cd91..302f7bdc2a 100644 --- a/examples/hello.rs +++ b/examples/hello.rs @@ -6,8 +6,8 @@ extern crate tokio; use futures::Future; -use hyper::{Body, Response}; -use hyper::server::{Server, const_service, service_fn}; +use hyper::{Body, Response, Server}; +use hyper::service::service_fn_ok; static PHRASE: &'static [u8] = b"Hello World!"; @@ -16,10 +16,16 @@ fn main() { let addr = ([127, 0, 0, 1], 3000).into(); - let new_service = const_service(service_fn(|_| { - //TODO: when `!` is stable, replace error type - Ok::<_, hyper::Error>(Response::new(Body::from(PHRASE))) - })); + // new_service is run for each connection, creating a 'service' + // to handle requests for that specific connection. + let new_service = || { + // This is the `Service` that will handle the connection. + // `service_fn_ok` is a helper to convert a function that + // returns a Response into a `Service`. + service_fn_ok(|_| { + Response::new(Body::from(PHRASE)) + }) + }; let server = Server::bind(&addr) .serve(new_service) diff --git a/examples/multi_server.rs b/examples/multi_server.rs index f8989fa334..3988561cac 100644 --- a/examples/multi_server.rs +++ b/examples/multi_server.rs @@ -5,39 +5,14 @@ extern crate pretty_env_logger; extern crate tokio; use futures::{Future}; -use futures::future::{FutureResult, lazy}; +use futures::future::{lazy}; -use hyper::{Body, Method, Request, Response, StatusCode}; -use hyper::server::{Server, Service}; +use hyper::{Body, Response, Server}; +use hyper::service::service_fn_ok; static INDEX1: &'static [u8] = b"The 1st service!"; static INDEX2: &'static [u8] = b"The 2nd service!"; -struct Srv(&'static [u8]); - -impl Service for Srv { - type Request = Request; - type Response = Response; - type Error = hyper::Error; - type Future = FutureResult, hyper::Error>; - - fn call(&self, req: Request) -> Self::Future { - futures::future::ok(match (req.method(), req.uri().path()) { - (&Method::GET, "/") => { - Response::new(self.0.into()) - }, - _ => { - Response::builder() - .status(StatusCode::NOT_FOUND) - .body(Body::empty()) - .unwrap() - } - }) - } - -} - - fn main() { pretty_env_logger::init(); @@ -46,11 +21,11 @@ fn main() { tokio::run(lazy(move || { let srv1 = Server::bind(&addr1) - .serve(|| Ok(Srv(INDEX1))) + .serve(|| service_fn_ok(|_| Response::new(Body::from(INDEX1)))) .map_err(|e| eprintln!("server 1 error: {}", e)); let srv2 = Server::bind(&addr2) - .serve(|| Ok(Srv(INDEX2))) + .serve(|| service_fn_ok(|_| Response::new(Body::from(INDEX2)))) .map_err(|e| eprintln!("server 2 error: {}", e)); println!("Listening on http://{} and http://{}", addr1, addr2); diff --git a/examples/params.rs b/examples/params.rs index 05987a3d57..e3704f70ed 100644 --- a/examples/params.rs +++ b/examples/params.rs @@ -5,10 +5,10 @@ extern crate pretty_env_logger; extern crate tokio; extern crate url; -use futures::{Future, Stream}; +use futures::{future, Future, Stream}; -use hyper::{Body, Method, Request, Response, StatusCode}; -use hyper::server::{Server, Service}; +use hyper::{Body, Method, Request, Response, Server, StatusCode}; +use hyper::service::service_fn; use std::collections::HashMap; use url::form_urlencoded; @@ -17,89 +17,80 @@ static INDEX: &[u8] = b"
Name: static MISSING: &[u8] = b"Missing field"; static NOTNUMERIC: &[u8] = b"Number field is not numeric"; -struct ParamExample; +// Using service_fn, we can turn this function into a `Service`. +fn param_example(req: Request) -> Box, Error=hyper::Error> + Send> { + match (req.method(), req.uri().path()) { + (&Method::GET, "/") | (&Method::GET, "/post") => { + Box::new(future::ok(Response::new(INDEX.into()))) + }, + (&Method::POST, "/post") => { + Box::new(req.into_body().concat2().map(|b| { + // Parse the request body. form_urlencoded::parse + // always succeeds, but in general parsing may + // fail (for example, an invalid post of json), so + // returning early with BadRequest may be + // necessary. + // + // Warning: this is a simplified use case. In + // principle names can appear multiple times in a + // form, and the values should be rolled up into a + // HashMap>. However in this + // example the simpler approach is sufficient. + let params = form_urlencoded::parse(b.as_ref()).into_owned().collect::>(); -impl Service for ParamExample { - type Request = Request; - type Response = Response; - type Error = hyper::Error; - type Future = Box + Send>; - - fn call(&self, req: Request) -> Self::Future { - match (req.method(), req.uri().path()) { - (&Method::GET, "/") | (&Method::GET, "/post") => { - Box::new(futures::future::ok(Response::new(INDEX.into()))) - }, - (&Method::POST, "/post") => { - Box::new(req.into_body().concat2().map(|b| { - // Parse the request body. form_urlencoded::parse - // always succeeds, but in general parsing may - // fail (for example, an invalid post of json), so - // returning early with BadRequest may be - // necessary. - // - // Warning: this is a simplified use case. In - // principle names can appear multiple times in a - // form, and the values should be rolled up into a - // HashMap>. However in this - // example the simpler approach is sufficient. - let params = form_urlencoded::parse(b.as_ref()).into_owned().collect::>(); - - // Validate the request parameters, returning - // early if an invalid input is detected. - let name = if let Some(n) = params.get("name") { - n + // Validate the request parameters, returning + // early if an invalid input is detected. + let name = if let Some(n) = params.get("name") { + n + } else { + return Response::builder() + .status(StatusCode::UNPROCESSABLE_ENTITY) + .body(MISSING.into()) + .unwrap(); + }; + let number = if let Some(n) = params.get("number") { + if let Ok(v) = n.parse::() { + v } else { return Response::builder() .status(StatusCode::UNPROCESSABLE_ENTITY) - .body(MISSING.into()) + .body(NOTNUMERIC.into()) .unwrap(); - }; - let number = if let Some(n) = params.get("number") { - if let Ok(v) = n.parse::() { - v - } else { - return Response::builder() - .status(StatusCode::UNPROCESSABLE_ENTITY) - .body(NOTNUMERIC.into()) - .unwrap(); - } - } else { - return Response::builder() - .status(StatusCode::UNPROCESSABLE_ENTITY) - .body(MISSING.into()) - .unwrap(); - }; + } + } else { + return Response::builder() + .status(StatusCode::UNPROCESSABLE_ENTITY) + .body(MISSING.into()) + .unwrap(); + }; - // Render the response. This will often involve - // calls to a database or web service, which will - // require creating a new stream for the response - // body. Since those may fail, other error - // responses such as InternalServiceError may be - // needed here, too. - let body = format!("Hello {}, your number is {}", name, number); - Response::new(body.into()) - })) - }, - _ => { - Box::new(futures::future::ok(Response::builder() - .status(StatusCode::NOT_FOUND) - .body(Body::empty()) - .unwrap())) - } + // Render the response. This will often involve + // calls to a database or web service, which will + // require creating a new stream for the response + // body. Since those may fail, other error + // responses such as InternalServiceError may be + // needed here, too. + let body = format!("Hello {}, your number is {}", name, number); + Response::new(body.into()) + })) + }, + _ => { + Box::new(future::ok(Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::empty()) + .unwrap())) } } } - fn main() { pretty_env_logger::init(); let addr = ([127, 0, 0, 1], 1337).into(); let server = Server::bind(&addr) - .serve(|| Ok(ParamExample)) + .serve(|| service_fn(param_example)) .map_err(|e| eprintln!("server error: {}", e)); tokio::run(server); diff --git a/examples/send_file.rs b/examples/send_file.rs index 222578425d..a72bebcd26 100644 --- a/examples/send_file.rs +++ b/examples/send_file.rs @@ -4,11 +4,11 @@ extern crate hyper; extern crate pretty_env_logger; extern crate tokio; -use futures::{Future/*, Sink*/}; +use futures::{future, Future}; use futures::sync::oneshot; -use hyper::{Body, /*Chunk,*/ Method, Request, Response, StatusCode}; -use hyper::server::{Server, Service}; +use hyper::{Body, Method, Request, Response, Server, StatusCode}; +use hyper::service::service_fn; use std::fs::File; use std::io::{self, copy/*, Read*/}; @@ -17,7 +17,92 @@ use std::thread; static NOTFOUND: &[u8] = b"Not Found"; static INDEX: &str = "examples/send_file_index.html"; -fn simple_file_send(f: &str) -> Box, Error = io::Error> + Send> { + +fn main() { + pretty_env_logger::init(); + + let addr = "127.0.0.1:1337".parse().unwrap(); + + let server = Server::bind(&addr) + .serve(|| service_fn(response_examples)) + .map_err(|e| eprintln!("server error: {}", e)); + + println!("Listening on http://{}", addr); + + tokio::run(server); +} + +type ResponseFuture = Box, Error=io::Error> + Send>; + +fn response_examples(req: Request) -> ResponseFuture { + match (req.method(), req.uri().path()) { + (&Method::GET, "/") | (&Method::GET, "/index.html") => { + simple_file_send(INDEX) + }, + (&Method::GET, "/big_file.html") => { + // Stream a large file in chunks. This requires a + // little more overhead with two channels, (one for + // the response future, and a second for the response + // body), but can handle arbitrarily large files. + // + // We use an artificially small buffer, since we have + // a small test file. + let (tx, rx) = oneshot::channel(); + thread::spawn(move || { + let _file = match File::open(INDEX) { + Ok(f) => f, + Err(_) => { + tx.send(Response::builder() + .status(StatusCode::NOT_FOUND) + .body(NOTFOUND.into()) + .unwrap()) + .expect("Send error on open"); + return; + }, + }; + let (_tx_body, rx_body) = Body::channel(); + let res = Response::new(rx_body.into()); + tx.send(res).expect("Send error on successful file read"); + /* TODO: fix once we have futures 0.2 Sink working + let mut buf = [0u8; 16]; + loop { + match file.read(&mut buf) { + Ok(n) => { + if n == 0 { + // eof + tx_body.close().expect("panic closing"); + break; + } else { + let chunk: Chunk = buf[0..n].to_vec().into(); + match tx_body.send_data(chunk).wait() { + Ok(t) => { tx_body = t; }, + Err(_) => { break; } + }; + } + }, + Err(_) => { break; } + } + } + */ + }); + + Box::new(rx.map_err(|e| io::Error::new(io::ErrorKind::Other, e))) + }, + (&Method::GET, "/no_file.html") => { + // Test what happens when file cannot be be found + simple_file_send("this_file_should_not_exist.html") + }, + _ => { + Box::new(future::ok(Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::empty()) + .unwrap())) + } + } + +} + +fn simple_file_send(f: &str) -> ResponseFuture { // Serve a file by reading it entirely into memory. As a result // this is limited to serving small files, but it is somewhat // simpler with a little less overhead. @@ -57,94 +142,3 @@ fn simple_file_send(f: &str) -> Box, Error = io::Er Box::new(rx.map_err(|e| io::Error::new(io::ErrorKind::Other, e))) } -struct ResponseExamples; - -impl Service for ResponseExamples { - type Request = Request; - type Response = Response; - type Error = io::Error; - type Future = Box + Send>; - - fn call(&self, req: Request) -> Self::Future { - match (req.method(), req.uri().path()) { - (&Method::GET, "/") | (&Method::GET, "/index.html") => { - simple_file_send(INDEX) - }, - (&Method::GET, "/big_file.html") => { - // Stream a large file in chunks. This requires a - // little more overhead with two channels, (one for - // the response future, and a second for the response - // body), but can handle arbitrarily large files. - // - // We use an artificially small buffer, since we have - // a small test file. - let (tx, rx) = oneshot::channel(); - thread::spawn(move || { - let _file = match File::open(INDEX) { - Ok(f) => f, - Err(_) => { - tx.send(Response::builder() - .status(StatusCode::NOT_FOUND) - .body(NOTFOUND.into()) - .unwrap()) - .expect("Send error on open"); - return; - }, - }; - let (_tx_body, rx_body) = Body::channel(); - let res = Response::new(rx_body.into()); - tx.send(res).expect("Send error on successful file read"); - /* TODO: fix once we have futures 0.2 Sink working - let mut buf = [0u8; 16]; - loop { - match file.read(&mut buf) { - Ok(n) => { - if n == 0 { - // eof - tx_body.close().expect("panic closing"); - break; - } else { - let chunk: Chunk = buf[0..n].to_vec().into(); - match tx_body.send_data(chunk).wait() { - Ok(t) => { tx_body = t; }, - Err(_) => { break; } - }; - } - }, - Err(_) => { break; } - } - } - */ - }); - - Box::new(rx.map_err(|e| io::Error::new(io::ErrorKind::Other, e))) - }, - (&Method::GET, "/no_file.html") => { - // Test what happens when file cannot be be found - simple_file_send("this_file_should_not_exist.html") - }, - _ => { - Box::new(futures::future::ok(Response::builder() - .status(StatusCode::NOT_FOUND) - .body(Body::empty()) - .unwrap())) - } - } - } - -} - - -fn main() { - pretty_env_logger::init(); - - let addr = "127.0.0.1:1337".parse().unwrap(); - - let server = Server::bind(&addr) - .serve(|| Ok(ResponseExamples)) - .map_err(|e| eprintln!("server error: {}", e)); - - println!("Listening on http://{}", addr); - - tokio::run(server); -} diff --git a/examples/server.rs b/examples/server.rs index e96d77e705..b30b115473 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -5,37 +5,27 @@ extern crate pretty_env_logger; extern crate tokio; use futures::Future; -use futures::future::{FutureResult}; -use hyper::{Body, Method, Request, Response, StatusCode}; -use hyper::server::{Server, Service}; +use hyper::{Body, Method, Request, Response, Server, StatusCode}; +use hyper::service::service_fn_ok; static INDEX: &'static [u8] = b"Try POST /echo"; -struct Echo; - -impl Service for Echo { - type Request = Request; - type Response = Response; - type Error = hyper::Error; - type Future = FutureResult; - - fn call(&self, req: Self::Request) -> Self::Future { - futures::future::ok(match (req.method(), req.uri().path()) { - (&Method::GET, "/") | (&Method::POST, "/") => { - Response::new(INDEX.into()) - }, - (&Method::POST, "/echo") => { - Response::new(req.into_body()) - }, - _ => { - let mut res = Response::new(Body::empty()); - *res.status_mut() = StatusCode::NOT_FOUND; - res - } - }) +// Using service_fn_ok, we can convert this function into a `Service`. +fn echo(req: Request) -> Response { + match (req.method(), req.uri().path()) { + (&Method::GET, "/") | (&Method::POST, "/") => { + Response::new(INDEX.into()) + }, + (&Method::POST, "/echo") => { + Response::new(req.into_body()) + }, + _ => { + let mut res = Response::new(Body::empty()); + *res.status_mut() = StatusCode::NOT_FOUND; + res + } } - } @@ -45,7 +35,7 @@ fn main() { let addr = ([127, 0, 0, 1], 1337).into(); let server = Server::bind(&addr) - .serve(|| Ok(Echo)) + .serve(|| service_fn_ok(echo)) .map_err(|e| eprintln!("server error: {}", e)); println!("Listening on http://{}", addr); diff --git a/examples/web_api.rs b/examples/web_api.rs index 597449552c..d5217b806a 100644 --- a/examples/web_api.rs +++ b/examples/web_api.rs @@ -4,12 +4,11 @@ extern crate hyper; extern crate pretty_env_logger; extern crate tokio; -use futures::{Future, Stream}; -use futures::future::lazy; +use futures::{future, Future, Stream}; -use hyper::{Body, Chunk, Client, Method, Request, Response, StatusCode}; +use hyper::{Body, Chunk, Client, Method, Request, Response, Server, StatusCode}; use hyper::client::HttpConnector; -use hyper::server::{Server, Service}; +use hyper::service::service_fn; #[allow(unused, deprecated)] use std::ascii::AsciiExt; @@ -19,69 +18,70 @@ static URL: &str = "http://127.0.0.1:1337/web_api"; static INDEX: &[u8] = b"test.html"; static LOWERCASE: &[u8] = b"i am a lower case string"; -struct ResponseExamples(Client); +fn response_examples(req: Request, client: &Client) + -> Box, Error=hyper::Error> + Send> +{ + match (req.method(), req.uri().path()) { + (&Method::GET, "/") | (&Method::GET, "/index.html") => { + let body = Body::from(INDEX); + Box::new(future::ok(Response::new(body))) + }, + (&Method::GET, "/test.html") => { + // Run a web query against the web api below + let req = Request::builder() + .method(Method::POST) + .uri(URL) + .body(LOWERCASE.into()) + .unwrap(); + let web_res_future = client.request(req); -impl Service for ResponseExamples { - type Request = Request; - type Response = Response; - type Error = hyper::Error; - type Future = Box + Send>; - - fn call(&self, req: Self::Request) -> Self::Future { - match (req.method(), req.uri().path()) { - (&Method::GET, "/") | (&Method::GET, "/index.html") => { - let body = Body::from(INDEX); - Box::new(futures::future::ok(Response::new(body))) - }, - (&Method::GET, "/test.html") => { - // Run a web query against the web api below - let req = Request::builder() - .method(Method::POST) - .uri(URL) - .body(LOWERCASE.into()) - .unwrap(); - let web_res_future = self.0.request(req); - - Box::new(web_res_future.map(|web_res| { - let body = Body::wrap_stream(web_res.into_body().map(|b| { - Chunk::from(format!("before: '{:?}'
after: '{:?}'", - std::str::from_utf8(LOWERCASE).unwrap(), - std::str::from_utf8(&b).unwrap())) - })); - Response::new(body) - })) - }, - (&Method::POST, "/web_api") => { - // A web api to run against. Simple upcasing of the body. - let body = Body::wrap_stream(req.into_body().map(|chunk| { - let upper = chunk.iter().map(|byte| byte.to_ascii_uppercase()) - .collect::>(); - Chunk::from(upper) + Box::new(web_res_future.map(|web_res| { + let body = Body::wrap_stream(web_res.into_body().map(|b| { + Chunk::from(format!("before: '{:?}'
after: '{:?}'", + std::str::from_utf8(LOWERCASE).unwrap(), + std::str::from_utf8(&b).unwrap())) })); - Box::new(futures::future::ok(Response::new(body))) - }, - _ => { - let body = Body::from(NOTFOUND); - Box::new(futures::future::ok(Response::builder() - .status(StatusCode::NOT_FOUND) - .body(body) - .unwrap())) - } + Response::new(body) + })) + }, + (&Method::POST, "/web_api") => { + // A web api to run against. Simple upcasing of the body. + let body = Body::wrap_stream(req.into_body().map(|chunk| { + let upper = chunk.iter().map(|byte| byte.to_ascii_uppercase()) + .collect::>(); + Chunk::from(upper) + })); + Box::new(future::ok(Response::new(body))) + }, + _ => { + let body = Body::from(NOTFOUND); + Box::new(future::ok(Response::builder() + .status(StatusCode::NOT_FOUND) + .body(body) + .unwrap())) } } - } - fn main() { pretty_env_logger::init(); let addr = "127.0.0.1:1337".parse().unwrap(); - tokio::run(lazy(move || { + tokio::run(future::lazy(move || { + // Share a `Client` with all `Service`s let client = Client::new(); + + let new_service = move || { + // Move a clone of `client` into the `service_fn`. + let client = client.clone(); + service_fn(move |req| { + response_examples(req, &client) + }) + }; + let server = Server::bind(&addr) - .serve(move || Ok(ResponseExamples(client.clone()))) + .serve(new_service) .map_err(|e| eprintln!("server error: {}", e)); println!("Listening on http://{}", addr); @@ -89,3 +89,4 @@ fn main() { server })); } + diff --git a/src/client/mod.rs b/src/client/mod.rs index e14ae5d85f..c93da09f22 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -11,7 +11,6 @@ use futures::sync::oneshot; use http::{Method, Request, Response, Uri, Version}; use http::header::{Entry, HeaderValue, HOST}; use http::uri::Scheme; -pub use tokio_service::Service; use body::{Body, Payload}; use common::Exec; @@ -295,22 +294,6 @@ where C: Connect + Sync + 'static, } } -impl Service for Client -where C: Connect + 'static, - C::Future: 'static, - B: Payload + Send + 'static, - B::Data: Send, -{ - type Request = Request; - type Response = Response; - type Error = ::Error; - type Future = FutureResponse; - - fn call(&self, req: Self::Request) -> Self::Future { - self.request(req) - } -} - impl Clone for Client { fn clone(&self) -> Client { Client { diff --git a/src/common/mod.rs b/src/common/mod.rs index e47aacafa6..cd8ea4d5cb 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -1,6 +1,5 @@ mod exec; +mod never; pub(crate) use self::exec::Exec; - -#[derive(Debug)] -pub enum Never {} +pub use self::never::Never; diff --git a/src/common/never.rs b/src/common/never.rs new file mode 100644 index 0000000000..dfb763b2a4 --- /dev/null +++ b/src/common/never.rs @@ -0,0 +1,22 @@ +//! An uninhabitable type meaning it can never happen. +//! +//! To be replaced with `!` once it is stable. + +use std::error::Error; +use std::fmt; + +#[derive(Debug)] +pub enum Never {} + +impl fmt::Display for Never { + fn fmt(&self, _: &mut fmt::Formatter) -> fmt::Result { + match *self {} + } +} + +impl Error for Never { + fn description(&self) -> &str { + match *self {} + } +} + diff --git a/src/error.rs b/src/error.rs index df8f02c847..fe2677702e 100644 --- a/src/error.rs +++ b/src/error.rs @@ -203,8 +203,8 @@ impl Error { Error::new(Kind::UnsupportedRequestMethod, None) } - pub(crate) fn new_user_new_service(err: io::Error) -> Error { - Error::new(Kind::NewService, Some(Box::new(err))) + pub(crate) fn new_user_new_service>(cause: E) -> Error { + Error::new(Kind::NewService, Some(cause.into())) } pub(crate) fn new_user_service>(cause: E) -> Error { diff --git a/src/lib.rs b/src/lib.rs index 95c257d443..eda2fe3352 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -30,7 +30,6 @@ extern crate time; extern crate tokio; extern crate tokio_executor; #[macro_use] extern crate tokio_io; -extern crate tokio_service; extern crate want; #[cfg(all(test, feature = "nightly"))] @@ -62,3 +61,4 @@ pub mod error; mod headers; mod proto; pub mod server; +pub mod service; diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index a504dce826..b9bfacad33 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -2,10 +2,10 @@ use bytes::Bytes; use futures::{Async, Future, Poll, Stream}; use http::{Request, Response, StatusCode}; use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_service::Service; use body::{Body, Payload}; use proto::{BodyLength, Conn, Http1Transaction, MessageHead, RequestHead, RequestLine, ResponseHead}; +use service::Service; pub(crate) struct Dispatcher { conn: Conn, @@ -312,7 +312,7 @@ impl Server where S: Service { impl Dispatch for Server where - S: Service, Response=Response>, + S: Service, S::Error: Into>, Bs: Payload, { diff --git a/src/proto/h2/server.rs b/src/proto/h2/server.rs index c0958d0361..bc3ba74de2 100644 --- a/src/proto/h2/server.rs +++ b/src/proto/h2/server.rs @@ -5,10 +5,10 @@ use tokio_io::{AsyncRead, AsyncWrite}; use ::body::Payload; use ::common::Exec; -use ::server::Service; +use ::service::Service; use super::{PipeToSendStream, SendBuf}; -use ::{Body, Request, Response}; +use ::{Body, Response}; pub(crate) struct Server where @@ -39,7 +39,7 @@ where impl Server where T: AsyncRead + AsyncWrite, - S: Service, Response=Response>, + S: Service, S::Error: Into>, S::Future: Send + 'static, B: Payload, @@ -62,7 +62,7 @@ where impl Future for Server where T: AsyncRead + AsyncWrite, - S: Service, Response=Response>, + S: Service, S::Error: Into>, S::Future: Send + 'static, B: Payload, @@ -96,8 +96,8 @@ where fn poll_server(&mut self, service: &mut S, exec: &Exec) -> Poll<(), ::Error> where S: Service< - Request=Request, - Response=Response, + ReqBody=Body, + ResBody=B, >, S::Error: Into>, S::Future: Send + 'static, diff --git a/src/server/conn.rs b/src/server/conn.rs index 3e7b3a5438..40532adc49 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -23,7 +23,7 @@ use tokio::reactor::Handle; use common::Exec; use proto; use body::{Body, Payload}; -use super::{HyperService, NewService, Request, Response, Service}; +use service::{NewService, Service}; pub use super::tcp::AddrIncoming; @@ -44,7 +44,7 @@ pub struct Http { /// A stream mapping incoming IOs to new services. /// -/// Yields `Connection`s that are futures that should be put on a reactor. +/// Yields `Connecting`s that are futures that should be put on a reactor. #[must_use = "streams do nothing unless polled"] #[derive(Debug)] pub struct Serve { @@ -53,6 +53,18 @@ pub struct Serve { protocol: Http, } +/// A future binding a `Service` to a `Connection`. +/// +/// Wraps the future returned from `NewService` into one that returns +/// a `Connection`. +#[must_use = "futures do nothing unless polled"] +#[derive(Debug)] +pub struct Connecting { + future: F, + io: Option, + protocol: Http, +} + #[must_use = "futures do nothing unless polled"] #[derive(Debug)] pub(super) struct SpawnAll { @@ -65,20 +77,19 @@ pub(super) struct SpawnAll { #[must_use = "futures do nothing unless polled"] pub struct Connection where - S: HyperService, - S::ResponseBody: Payload, + S: Service, { pub(super) conn: Either< proto::h1::Dispatcher< proto::h1::dispatch::Server, - S::ResponseBody, + S::ResBody, I, proto::ServerTransaction, >, proto::h2::Server< I, S, - S::ResponseBody, + S::ResBody, >, >, } @@ -163,7 +174,7 @@ impl Http { self } - /// Bind a connection together with a `Service`. + /// Bind a connection together with a [`Service`](::service::Service). /// /// This returns a Future that must be polled in order for HTTP to be /// driven on the connection. @@ -177,14 +188,14 @@ impl Http { /// # extern crate tokio_io; /// # use futures::Future; /// # use hyper::{Body, Request, Response}; - /// # use hyper::server::Service; + /// # use hyper::service::Service; /// # use hyper::server::conn::Http; /// # use tokio_io::{AsyncRead, AsyncWrite}; /// # use tokio::reactor::Handle; /// # fn run(some_io: I, some_service: S) /// # where /// # I: AsyncRead + AsyncWrite + Send + 'static, - /// # S: Service, Response=Response, Error=hyper::Error> + Send + 'static, + /// # S: Service + Send + 'static, /// # S::Future: Send /// # { /// let http = Http::new(); @@ -200,7 +211,7 @@ impl Http { /// ``` pub fn serve_connection(&self, io: I, service: S) -> Connection where - S: Service, Response = Response>, + S: Service, S::Error: Into>, S::Future: Send + 'static, Bd: Payload, @@ -235,7 +246,7 @@ impl Http { /// connection. pub fn serve_addr(&self, addr: &SocketAddr, new_service: S) -> ::Result> where - S: NewService, Response=Response>, + S: NewService, S::Error: Into>, Bd: Payload, { @@ -254,7 +265,7 @@ impl Http { /// connection. pub fn serve_addr_handle(&self, addr: &SocketAddr, handle: &Handle, new_service: S) -> ::Result> where - S: NewService, Response = Response>, + S: NewService, S::Error: Into>, Bd: Payload, { @@ -271,7 +282,7 @@ impl Http { I: Stream, I::Error: Into>, I::Item: AsyncRead + AsyncWrite, - S: NewService, Response = Response>, + S: NewService, S::Error: Into>, Bd: Payload, { @@ -288,7 +299,7 @@ impl Http { impl Connection where - S: Service, Response=Response> + 'static, + S: Service + 'static, S::Error: Into>, S::Future: Send, I: AsyncRead + AsyncWrite + 'static, @@ -350,7 +361,7 @@ where impl Future for Connection where - S: Service, Response=Response> + 'static, + S: Service + 'static, S::Error: Into>, S::Future: Send, I: AsyncRead + AsyncWrite + 'static, @@ -366,8 +377,7 @@ where impl fmt::Debug for Connection where - S: HyperService, - S::ResponseBody: Payload, + S: Service, { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Connection") @@ -403,24 +413,48 @@ where I: Stream, I::Item: AsyncRead + AsyncWrite, I::Error: Into>, - S: NewService, Response=Response>, + S: NewService, S::Error: Into>, - ::Future: Send + 'static, + ::Future: Send + 'static, B: Payload, { - type Item = Connection; + type Item = Connecting; type Error = ::Error; fn poll(&mut self) -> Poll, Self::Error> { if let Some(io) = try_ready!(self.incoming.poll().map_err(::Error::new_accept)) { - let service = self.new_service.new_service().map_err(::Error::new_user_new_service)?; - Ok(Async::Ready(Some(self.protocol.serve_connection(io, service)))) + let new_fut = self.new_service.new_service(); + Ok(Async::Ready(Some(Connecting { + future: new_fut, + io: Some(io), + protocol: self.protocol.clone(), + }))) } else { Ok(Async::Ready(None)) } } } +// ===== impl Connecting ===== + +impl Future for Connecting +where + I: AsyncRead + AsyncWrite, + F: Future, + S: Service, + S::Future: Send + 'static, + B: Payload, +{ + type Item = Connection; + type Error = F::Error; + + fn poll(&mut self) -> Poll { + let service = try_ready!(self.future.poll()); + let io = self.io.take().expect("polled after complete"); + Ok(self.protocol.serve_connection(io, service).into()) + } +} + // ===== impl SpawnAll ===== impl SpawnAll { @@ -440,10 +474,11 @@ where I: Stream, I::Error: Into>, I::Item: AsyncRead + AsyncWrite + Send + 'static, - S: NewService, Response = Response> + Send + 'static, + S: NewService + Send + 'static, S::Error: Into>, - ::Instance: Send, - <::Instance as Service>::Future: Send + 'static, + S::Service: Send, + S::Future: Send + 'static, + ::Future: Send + 'static, B: Payload, { type Item = (); @@ -451,8 +486,11 @@ where fn poll(&mut self) -> Poll { loop { - if let Some(conn) = try_ready!(self.serve.poll()) { - let fut = conn + if let Some(connecting) = try_ready!(self.serve.poll()) { + let fut = connecting + .map_err(::Error::new_user_new_service) + // flatten basically + .and_then(|conn| conn) .map_err(|err| debug!("conn error: {}", err)); self.serve.protocol.exec.execute(fut); } else { diff --git a/src/server/mod.rs b/src/server/mod.rs index 942d87847b..cf5487a795 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -7,9 +7,47 @@ //! //! - The higher-level [`Server`](Server). //! - The lower-level [conn](conn) module. +//! +//! # Server +//! +//! The [`Server`](Server) is main way to start listening for HTTP requests. +//! It wraps a listener with a [`NewService`](::service), and then should +//! be executed to start serving requests. +//! +//! ## Example +//! +//! ```no_run +//! extern crate futures; +//! extern crate hyper; +//! extern crate tokio; +//! +//! use futures::Future; +//! use hyper::{Body, Response, Server}; +//! use hyper::service::service_fn_ok; +//! +//! fn main() { +//! // Construct our SocketAddr to listen on... +//! let addr = ([127, 0, 0, 1], 3000).into(); +//! +//! // And a NewService to handle each connection... +//! let new_service = || { +//! service_fn_ok(|_req| { +//! Response::new(Body::from("Hello World")) +//! }) +//! }; +//! +//! // Then bind and serve... +//! let server = Server::bind(&addr) +//! .serve(new_service); +//! +//! // Finally, spawn `server` onto an Executor... +//! tokio::run(server.map_err(|e| { +//! eprintln!("server error: {}", e); +//! })); +//! } +//! ``` pub mod conn; -mod service; mod tcp; use std::fmt; @@ -17,19 +55,16 @@ use std::net::SocketAddr; use std::time::Duration; use futures::{Future, Stream, Poll}; -use http::{Request, Response}; use tokio_io::{AsyncRead, AsyncWrite}; -pub use tokio_service::{NewService, Service}; use body::{Body, Payload}; +use service::{NewService, Service}; // Renamed `Http` as `Http_` for now so that people upgrading don't see an // error that `hyper::server::Http` is private... use self::conn::{Http as Http_, SpawnAll}; -use self::hyper_service::HyperService; +//use self::hyper_service::HyperService; use self::tcp::{AddrIncoming}; -pub use self::service::{const_service, service_fn}; - /// A listening HTTP server. /// /// `Server` is a `Future` mapping a bound listener with a set of service @@ -93,10 +128,11 @@ where I: Stream, I::Error: Into>, I::Item: AsyncRead + AsyncWrite + Send + 'static, - S: NewService, Response = Response> + Send + 'static, + S: NewService + Send + 'static, S::Error: Into>, - ::Instance: Send, - <::Instance as Service>::Future: Send + 'static, + S::Service: Send, + S::Future: Send + 'static, + ::Future: Send + 'static, B: Payload, { type Item = (); @@ -137,15 +173,38 @@ impl Builder { } /// Consume this `Builder`, creating a [`Server`](Server). + /// + /// # Example + /// + /// ```rust + /// use hyper::{Body, Response, Server}; + /// use hyper::service::service_fn_ok; + /// + /// // Construct our SocketAddr to listen on... + /// let addr = ([127, 0, 0, 1], 3000).into(); + /// + /// // And a NewService to handle each connection... + /// let new_service = || { + /// service_fn_ok(|_req| { + /// Response::new(Body::from("Hello World")) + /// }) + /// }; + /// + /// // Then bind and serve... + /// let server = Server::bind(&addr) + /// .serve(new_service); + /// + /// // Finally, spawn `server` onto an Executor... + /// ``` pub fn serve(self, new_service: S) -> Server where I: Stream, I::Error: Into>, I::Item: AsyncRead + AsyncWrite + Send + 'static, - S: NewService, Response = Response>, + S: NewService + Send + 'static, S::Error: Into>, - ::Instance: Send, - <::Instance as Service>::Future: Send + 'static, + S::Service: Send, + ::Future: Send + 'static, B: Payload, { let serve = self.protocol.serve_incoming(self.incoming, new_service); @@ -174,52 +233,3 @@ impl Builder { } } -mod hyper_service { - use super::{Body, Payload, Request, Response, Service}; - /// A "trait alias" for any type that implements `Service` with hyper's - /// Request, Response, and Error types, and a streaming body. - /// - /// There is an auto implementation inside hyper, so no one can actually - /// implement this trait. It simply exists to reduce the amount of generics - /// needed. - pub trait HyperService: Service + Sealed { - #[doc(hidden)] - type ResponseBody; - #[doc(hidden)] - type Sealed: Sealed2; - } - - pub trait Sealed {} - pub trait Sealed2 {} - - #[allow(missing_debug_implementations)] - pub struct Opaque { - _inner: (), - } - - impl Sealed2 for Opaque {} - - impl Sealed for S - where - S: Service< - Request=Request, - Response=Response, - >, - S::Error: Into>, - B: Payload, - {} - - impl HyperService for S - where - S: Service< - Request=Request, - Response=Response, - >, - S::Error: Into>, - S: Sealed, - B: Payload, - { - type ResponseBody = B; - type Sealed = Opaque; - } -} diff --git a/src/server/service.rs b/src/server/service.rs deleted file mode 100644 index 20c9812e35..0000000000 --- a/src/server/service.rs +++ /dev/null @@ -1,64 +0,0 @@ -use std::marker::PhantomData; -use std::sync::Arc; - -use futures::IntoFuture; -use tokio_service::{NewService, Service}; - -/// Create a `Service` from a function. -pub fn service_fn(f: F) -> ServiceFn -where - F: Fn(R) -> S, - S: IntoFuture, -{ - ServiceFn { - f: f, - _req: PhantomData, - } -} - -/// Create a `NewService` by sharing references of `service. -pub fn const_service(service: S) -> ConstService { - ConstService { - svc: Arc::new(service), - } -} - -#[derive(Debug)] -pub struct ServiceFn { - f: F, - _req: PhantomData R>, -} - -impl Service for ServiceFn -where - F: Fn(R) -> S, - S: IntoFuture, -{ - type Request = R; - type Response = S::Item; - type Error = S::Error; - type Future = S::Future; - - fn call(&self, req: Self::Request) -> Self::Future { - (self.f)(req).into_future() - } -} - -#[derive(Debug)] -pub struct ConstService { - svc: Arc, -} - -impl NewService for ConstService -where - S: Service, -{ - type Request = S::Request; - type Response = S::Response; - type Error = S::Error; - type Instance = Arc; - - fn new_service(&self) -> ::std::io::Result { - Ok(self.svc.clone()) - } -} diff --git a/src/service/mod.rs b/src/service/mod.rs new file mode 100644 index 0000000000..534519df9f --- /dev/null +++ b/src/service/mod.rs @@ -0,0 +1,35 @@ +//! Services and NewServices +//! +//! - A [`Service`](Service) is a trait representing an asynchronous function +//! of a request to a response. It's similar to +//! `async fn(Request) -> Result`. +//! - A [`NewService`](NewService) is a trait creating specific instances of a +//! `Service`. +//! +//! These types are conceptually similar to those in +//! [tower](https://crates.io/crates/tower), while being specific to hyper. +//! +//! # Service +//! +//! In hyper, especially in the server setting, a `Service` is usually bound +//! to a single connection. It defines how to respond to **all** requests that +//! connection will receive. +//! +//! While it's possible to implement `Service` for a type manually, the helpers +//! [`service_fn`](service_fn) and [`service_fn_ok`](service_fn_ok) should be +//! sufficient for most cases. +//! +//! # NewService +//! +//! Since a `Service` is bound to a single connection, a [`Server`](::Server) +//! needs a way to make them as it accepts connections. This is what a +//! `NewService` does. +//! +//! Resources that need to be shared by all `Service`s can be put into a +//! `NewService`, and then passed to individual `Service`s when `new_service` +//! is called. +mod new_service; +mod service; + +pub use self::new_service::{NewService}; +pub use self::service::{service_fn, service_fn_ok, Service}; diff --git a/src/service/new_service.rs b/src/service/new_service.rs new file mode 100644 index 0000000000..ce90a69b99 --- /dev/null +++ b/src/service/new_service.rs @@ -0,0 +1,55 @@ +use std::error::Error as StdError; + +use futures::{Future, IntoFuture}; + +use body::Payload; +use super::Service; + +/// An asynchronous constructor of `Service`s. +pub trait NewService { + /// The `Payload` body of the `http::Request`. + type ReqBody: Payload; + + /// The `Payload` body of the `http::Response`. + type ResBody: Payload; + + /// The error type that can be returned by `Service`s. + type Error: Into>; + + /// The resolved `Service` from `new_service()`. + type Service: Service< + ReqBody=Self::ReqBody, + ResBody=Self::ResBody, + Error=Self::Error, + >; + + /// The future returned from `new_service` of a `Service`. + type Future: Future; + + /// The error type that can be returned when creating a new `Service. + type InitError: Into>; + + /// Create a new `Service`. + fn new_service(&self) -> Self::Future; +} + +impl NewService for F +where + F: Fn() -> R, + R: IntoFuture, + R::Error: Into>, + S: Service, +{ + type ReqBody = S::ReqBody; + type ResBody = S::ResBody; + type Error = S::Error; + type Service = S; + type Future = R::Future; + type InitError = R::Error; + + + fn new_service(&self) -> Self::Future { + (*self)().into_future() + } +} + diff --git a/src/service/service.rs b/src/service/service.rs new file mode 100644 index 0000000000..1c54b4eb2e --- /dev/null +++ b/src/service/service.rs @@ -0,0 +1,165 @@ +use std::error::Error as StdError; +use std::fmt; +use std::marker::PhantomData; + +use futures::{future, Future, IntoFuture}; + +use body::Payload; +use common::Never; +use ::{Request, Response}; + +/// An asynchronous function from `Request` to `Response`. +pub trait Service { + /// The `Payload` body of the `http::Request`. + type ReqBody: Payload; + + /// The `Payload` body of the `http::Response`. + type ResBody: Payload; + + /// The error type that can occur within this `Service. + /// + /// Note: Returning an `Error` to a hyper server will cause the connection + /// to be abruptly aborted. In most cases, it is better to return a `Response` + /// with a 4xx or 5xx status code. + type Error: Into>; + + /// The `Future` returned by this `Service`. + type Future: Future, Error=Self::Error>; + + /// Calls this `Service` with a request, returning a `Future` of the response. + fn call(&mut self, req: Request) -> Self::Future; +} + + +/// Create a `Service` from a function. +/// +/// # Example +/// +/// ```rust +/// use hyper::{Body, Request, Response, Version}; +/// use hyper::service::service_fn; +/// +/// let service = service_fn(|req: Request| { +/// if req.version() == Version::HTTP_11 { +/// Ok(Response::new(Body::from("Hello World"))) +/// } else { +/// // Note: it's usually better to return a Response +/// // with an appropriate StatusCode instead of an Err. +/// Err("not HTTP/1.1, abort connection") +/// } +/// }); +/// ``` +pub fn service_fn(f: F) -> ServiceFn +where + F: FnMut(Request) -> S, + S: IntoFuture, +{ + ServiceFn { + f, + _req: PhantomData, + } +} + +/// Create a `Service` from a function that never errors. +/// +/// # Example +/// +/// ```rust +/// use hyper::{Body, Request, Response}; +/// use hyper::service::service_fn_ok; +/// +/// let service = service_fn_ok(|req: Request| { +/// println!("request: {} {}", req.method(), req.uri()); +/// Response::new(Body::from("Hello World")) +/// }); +/// ``` +pub fn service_fn_ok(f: F) -> ServiceFnOk +where + F: FnMut(Request) -> Response, + S: Payload, +{ + ServiceFnOk { + f, + _req: PhantomData, + } +} + +// Not exported from crate as this will likely be replaced with `impl Service`. +pub struct ServiceFn { + f: F, + _req: PhantomData, +} + +impl Service for ServiceFn +where + F: FnMut(Request) -> Ret, + ReqBody: Payload, + Ret: IntoFuture>, + Ret::Error: Into>, + ResBody: Payload, +{ + type ReqBody = ReqBody; + type ResBody = ResBody; + type Error = Ret::Error; + type Future = Ret::Future; + + fn call(&mut self, req: Request) -> Self::Future { + (self.f)(req).into_future() + } +} + +impl IntoFuture for ServiceFn { + type Future = future::FutureResult; + type Item = Self; + type Error = Never; + + fn into_future(self) -> Self::Future { + future::ok(self) + } +} + +impl fmt::Debug for ServiceFn { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("impl Service") + .finish() + } +} + +// Not exported from crate as this will likely be replaced with `impl Service`. +pub struct ServiceFnOk { + f: F, + _req: PhantomData, +} + +impl Service for ServiceFnOk +where + F: FnMut(Request) -> Response, + ReqBody: Payload, + ResBody: Payload, +{ + type ReqBody = ReqBody; + type ResBody = ResBody; + type Error = Never; + type Future = future::FutureResult, Never>; + + fn call(&mut self, req: Request) -> Self::Future { + future::ok((self.f)(req)) + } +} + +impl IntoFuture for ServiceFnOk { + type Future = future::FutureResult; + type Item = Self; + type Error = Never; + + fn into_future(self) -> Self::Future { + future::ok(self) + } +} + +impl fmt::Debug for ServiceFnOk { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("impl Service") + .finish() + } +} diff --git a/tests/server.rs b/tests/server.rs index a1ac357459..b8020f08c7 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -31,8 +31,8 @@ use tokio_io::{AsyncRead, AsyncWrite}; use hyper::{Body, Request, Response, StatusCode}; -use hyper::server::{Service, NewService, service_fn}; use hyper::server::conn::Http; +use hyper::service::{service_fn, Service}; fn tcp_bind(addr: &SocketAddr, handle: &Handle) -> ::tokio::io::Result { let std_listener = StdTcpListener::bind(addr).unwrap(); @@ -95,28 +95,17 @@ fn get_implicitly_empty() { .map_err(|_| unreachable!()) .and_then(|(item, _incoming)| { let socket = item.unwrap(); - Http::new().serve_connection(socket, GetImplicitlyEmpty) + Http::new().serve_connection(socket, service_fn(|req: Request| { + req.into_body() + .concat2() + .map(|buf| { + assert!(buf.is_empty()); + Response::new(Body::empty()) + }) + })) }); fut.wait().unwrap(); - - struct GetImplicitlyEmpty; - - impl Service for GetImplicitlyEmpty { - type Request = Request; - type Response = Response; - type Error = hyper::Error; - type Future = Box + Send>; - - fn call(&self, req: Request) -> Self::Future { - Box::new(req.into_body() - .concat2() - .map(|buf| { - assert!(buf.is_empty()); - Response::new(Body::empty()) - })) - } - } } mod response_body_lengths { @@ -1258,24 +1247,9 @@ enum Msg { End, } -impl NewService for TestService { - type Request = Request; - type Response = Response; - type Error = hyper::Error; - - type Instance = TestService; - - fn new_service(&self) -> std::io::Result { - Ok(self.clone()) - } -} - -impl Service for TestService { - type Request = Request; - type Response = Response; - type Error = hyper::Error; - type Future = Box, Error=hyper::Error> + Send>; - fn call(&self, req: Request) -> Self::Future { +impl TestService { + // Box is needed until we can return `impl Future` from a fn + fn call(&self, req: Request) -> Box, Error=hyper::Error> + Send> { let tx1 = self.tx.clone(); let tx2 = self.tx.clone(); @@ -1309,7 +1283,6 @@ impl Service for TestService { res })) } - } const HELLO: &'static str = "hello"; @@ -1317,12 +1290,12 @@ const HELLO: &'static str = "hello"; struct HelloWorld; impl Service for HelloWorld { - type Request = Request; - type Response = Response; + type ReqBody = Body; + type ResBody = Body; type Error = hyper::Error; - type Future = FutureResult; + type Future = FutureResult, Self::Error>; - fn call(&self, _req: Request) -> Self::Future { + fn call(&mut self, _req: Request) -> Self::Future { let response = Response::new(HELLO.into()); future::ok(response) } @@ -1376,10 +1349,13 @@ fn serve_with_options(options: ServeOptions) -> Serve { let serve = Http::new() .keep_alive(keep_alive) .pipeline_flush(pipeline) - .serve_addr(&addr, TestService { - tx: Arc::new(Mutex::new(msg_tx.clone())), - _timeout: dur, - reply: reply_rx, + .serve_addr(&addr, move || { + let ts = TestService { + tx: Arc::new(Mutex::new(msg_tx.clone())), + _timeout: dur, + reply: reply_rx.clone(), + }; + service_fn(move |req| ts.call(req)) }) .expect("bind to address"); @@ -1390,10 +1366,12 @@ fn serve_with_options(options: ServeOptions) -> Serve { ).expect("server addr tx"); // spawn_all() is private for now, so just duplicate it here - let spawn_all = serve.for_each(|conn| { - tokio::spawn(conn.map_err(|e| { - println!("server error: {}", e); - })); + let spawn_all = serve.for_each(|connecting| { + let fut = connecting + .map_err(|never| -> hyper::Error { match never {} }) + .flatten() + .map_err(|e| println!("server error: {}", e)); + tokio::spawn(fut); Ok(()) }).map_err(|e| { println!("accept error: {}", e) diff --git a/tests/support/mod.rs b/tests/support/mod.rs index 507aa9f184..4d461356d4 100644 --- a/tests/support/mod.rs +++ b/tests/support/mod.rs @@ -187,7 +187,7 @@ pub fn __run_test(cfg: __TestConfig) { extern crate pretty_env_logger; use hyper::{Body, Client, Request, Response}; use hyper::client::HttpConnector; - use std::sync::Arc; + use std::sync::{Arc, Mutex}; let _ = pretty_env_logger::try_init(); let rt = Runtime::new().expect("new rt"); let handle = rt.reactor().clone(); @@ -198,37 +198,40 @@ pub fn __run_test(cfg: __TestConfig) { .executor(rt.executor()) .build::<_, Body>(connector); - let serve_handles = ::std::sync::Mutex::new( + let serve_handles = Arc::new(Mutex::new( cfg.server_msgs - ); - let service = hyper::server::service_fn(move |req: Request| -> Box, Error=hyper::Error> + Send> { - let (sreq, sres) = serve_handles.lock() - .unwrap() - .remove(0); + )); + let new_service = move || { + // Move a clone into the service_fn + let serve_handles = serve_handles.clone(); + hyper::service::service_fn(move |req: Request| { + let (sreq, sres) = serve_handles.lock() + .unwrap() + .remove(0); - assert_eq!(req.uri().path(), sreq.uri); - assert_eq!(req.method(), &sreq.method); - for (name, value) in &sreq.headers { - assert_eq!( - req.headers()[name], - value - ); - } - let sbody = sreq.body; - Box::new(req.into_body() - .concat2() - .map(move |body| { - assert_eq!(body.as_ref(), sbody.as_slice()); + assert_eq!(req.uri().path(), sreq.uri); + assert_eq!(req.method(), &sreq.method); + for (name, value) in &sreq.headers { + assert_eq!( + req.headers()[name], + value + ); + } + let sbody = sreq.body; + req.into_body() + .concat2() + .map(move |body| { + assert_eq!(body.as_ref(), sbody.as_slice()); - let mut res = Response::builder() - .status(sres.status) - .body(sres.body.into()) - .expect("Response::build"); - *res.headers_mut() = sres.headers; - res - })) - }); - let new_service = hyper::server::const_service(service); + let mut res = Response::builder() + .status(sres.status) + .body(Body::from(sres.body)) + .expect("Response::build"); + *res.headers_mut() = sres.headers; + res + }) + }) + }; let serve = hyper::server::conn::Http::new() .http2_only(cfg.server_version == 2) @@ -246,8 +249,12 @@ pub fn __run_test(cfg: __TestConfig) { let (success_tx, success_rx) = oneshot::channel(); let expected_connections = cfg.connections; let server = serve - .fold(0, move |cnt, conn| { - exe.spawn(conn.map_err(|e| panic!("server connection error: {}", e))); + .fold(0, move |cnt, connecting| { + let fut = connecting + .map_err(|never| -> hyper::Error { match never {} }) + .flatten() + .map_err(|e| panic!("server connection error: {}", e)); + exe.spawn(fut); Ok::<_, hyper::Error>(cnt + 1) }) .map(move |cnt| {