From 16b04fa9a334bc90895b970bb0e0bb69baba28df Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Fri, 6 Apr 2018 13:57:03 -0700 Subject: [PATCH] feat(error): revamp `hyper::Error` type **The `Error` is now an opaque struct**, which allows for more variants to be added freely, and the internal representation to change without being breaking changes. For inspecting an `Error`, there are several `is_*` methods to check for certain classes of errors, such as `Error::is_parse()`. The `cause` can also be inspected, like before. This likely seems like a downgrade, but more inspection can be added as needed! The `Error` now knows about more states, which gives much more context around when a certain error occurs. This is also expressed in the description and `fmt` messages. **Most places where a user would provide an error to hyper can now pass any error type** (`E: Into>`). This error is passed back in relevant places, and can be useful for logging. This should make it much clearer about what error a user should provide to hyper: any it feels is relevant! Closes #1128 Closes #1130 Closes #1431 Closes #1338 BREAKING CHANGE: `Error` is no longer an enum to pattern match over, or to construct. Code will need to be updated accordingly. For body streams or `Service`s, inference might be unable to determine what error type you mean to return. Starting in Rust 1.26, you could just label that as `!` if you never return an error. --- examples/client.rs | 5 +- examples/hello.rs | 3 +- examples/send_file.rs | 9 +- src/client/conn.rs | 24 +-- src/client/connect.rs | 2 +- src/client/dispatch.rs | 13 +- src/client/mod.rs | 51 ++--- src/client/pool.rs | 24 ++- src/client/tests.rs | 12 +- src/error.rs | 433 +++++++++++++++++++++++---------------- src/proto/body.rs | 18 +- src/proto/h1/conn.rs | 7 +- src/proto/h1/dispatch.rs | 49 ++--- src/proto/h1/io.rs | 8 +- src/proto/h1/mod.rs | 4 +- src/proto/h1/role.rs | 35 ++-- src/proto/mod.rs | 12 +- src/server/conn.rs | 20 +- src/server/mod.rs | 107 ++++++---- tests/client.rs | 118 +++++------ tests/server.rs | 8 +- 21 files changed, 536 insertions(+), 426 deletions(-) diff --git a/examples/client.rs b/examples/client.rs index 53507e81bd..b664cb611e 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -40,8 +40,9 @@ fn main() { println!("Response: {}", res.status()); println!("Headers: {:#?}", res.headers()); - res.into_parts().1.into_stream().for_each(|chunk| { - io::stdout().write_all(&chunk).map_err(From::from) + res.into_body().into_stream().for_each(|chunk| { + io::stdout().write_all(&chunk) + .map_err(|e| panic!("example expects stdout is open, error={}", e)) }) }).then(|result| { if let Some(err) = result.err() { diff --git a/examples/hello.rs b/examples/hello.rs index b4195d5d6e..318351f397 100644 --- a/examples/hello.rs +++ b/examples/hello.rs @@ -17,7 +17,8 @@ fn main() { let addr = ([127, 0, 0, 1], 3000).into(); let new_service = const_service(service_fn(|_| { - Ok(Response::new(Body::from(PHRASE))) + //TODO: when `!` is stable, replace error type + Ok::<_, hyper::Error>(Response::new(Body::from(PHRASE))) })); tokio::runtime::run2(lazy(move |_| { diff --git a/examples/send_file.rs b/examples/send_file.rs index ad0cdc12d9..43f7129553 100644 --- a/examples/send_file.rs +++ b/examples/send_file.rs @@ -9,7 +9,6 @@ use futures::future::lazy; use futures::channel::oneshot; use hyper::{Body, /*Chunk,*/ Method, Request, Response, StatusCode}; -use hyper::error::Error; use hyper::server::{Http, Service}; use std::fs::File; @@ -19,7 +18,7 @@ use std::thread; static NOTFOUND: &[u8] = b"Not Found"; static INDEX: &str = "examples/send_file_index.html"; -fn simple_file_send(f: &str) -> Box, Error = hyper::Error> + Send> { +fn simple_file_send(f: &str) -> Box, Error = io::Error> + Send> { // Serve a file by reading it entirely into memory. As a result // this is limited to serving small files, but it is somewhat // simpler with a little less overhead. @@ -56,7 +55,7 @@ fn simple_file_send(f: &str) -> Box, Error = hyper: }; }); - Box::new(rx.map_err(|e| Error::from(io::Error::new(io::ErrorKind::Other, e)))) + Box::new(rx.map_err(|e| io::Error::new(io::ErrorKind::Other, e))) } struct ResponseExamples; @@ -64,7 +63,7 @@ struct ResponseExamples; impl Service for ResponseExamples { type Request = Request; type Response = Response; - type Error = hyper::Error; + type Error = io::Error; type Future = Box + Send>; fn call(&self, req: Request) -> Self::Future { @@ -119,7 +118,7 @@ impl Service for ResponseExamples { */ }); - Box::new(rx.map_err(|e| Error::from(io::Error::new(io::ErrorKind::Other, e)))) + Box::new(rx.map_err(|e| io::Error::new(io::ErrorKind::Other, e))) }, (&Method::GET, "/no_file.html") => { // Test what happens when file cannot be be found diff --git a/src/client/conn.rs b/src/client/conn.rs index c01b24e488..9b2475b648 100644 --- a/src/client/conn.rs +++ b/src/client/conn.rs @@ -46,7 +46,7 @@ pub struct SendRequest { pub struct Connection where T: AsyncRead + AsyncWrite, - B: Entity + 'static, + B: Entity + 'static, { inner: proto::dispatch::Dispatcher< proto::dispatch::Client, @@ -139,7 +139,7 @@ impl SendRequest impl SendRequest where - B: Entity + 'static, + B: Entity + 'static, { /// Sends a `Request` on the associated connection. /// @@ -263,7 +263,7 @@ impl fmt::Debug for SendRequest { impl Connection where T: AsyncRead + AsyncWrite, - B: Entity + 'static, + B: Entity + 'static, { /// Return the inner IO object, and additional information. pub fn into_parts(self) -> Parts { @@ -290,7 +290,7 @@ where impl Future for Connection where T: AsyncRead + AsyncWrite, - B: Entity + 'static, + B: Entity + 'static, { type Item = (); type Error = ::Error; @@ -303,7 +303,7 @@ where impl fmt::Debug for Connection where T: AsyncRead + AsyncWrite + fmt::Debug, - B: Entity + 'static, + B: Entity + 'static, { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Connection") @@ -332,7 +332,7 @@ impl Builder { pub fn handshake(&self, io: T) -> Handshake where T: AsyncRead + AsyncWrite, - B: Entity + 'static, + B: Entity + 'static, { Handshake { inner: HandshakeInner { @@ -346,7 +346,7 @@ impl Builder { pub(super) fn handshake_no_upgrades(&self, io: T) -> HandshakeNoUpgrades where T: AsyncRead + AsyncWrite, - B: Entity + 'static, + B: Entity + 'static, { HandshakeNoUpgrades { inner: HandshakeInner { @@ -363,7 +363,7 @@ impl Builder { impl Future for Handshake where T: AsyncRead + AsyncWrite, - B: Entity + 'static, + B: Entity + 'static, { type Item = (SendRequest, Connection); type Error = ::Error; @@ -388,7 +388,7 @@ impl fmt::Debug for Handshake { impl Future for HandshakeNoUpgrades where T: AsyncRead + AsyncWrite, - B: Entity + 'static, + B: Entity + 'static, { type Item = (SendRequest, proto::dispatch::Dispatcher< proto::dispatch::Client, @@ -407,7 +407,7 @@ where impl Future for HandshakeInner where T: AsyncRead + AsyncWrite, - B: Entity + 'static, + B: Entity + 'static, R: proto::Http1Transaction< Incoming=StatusCode, Outgoing=proto::RequestLine, @@ -471,7 +471,7 @@ impl AssertSendSync for SendRequest {} impl AssertSend for Connection where T: AsyncRead + AsyncWrite, - B: Entity + 'static, + B: Entity + 'static, B::Data: Send + 'static, {} @@ -479,7 +479,7 @@ where impl AssertSendSync for Connection where T: AsyncRead + AsyncWrite, - B: Entity + 'static, + B: Entity + 'static, B::Data: Send + Sync + 'static, {} diff --git a/src/client/connect.rs b/src/client/connect.rs index ad5ab7b518..bd95e8adbe 100644 --- a/src/client/connect.rs +++ b/src/client/connect.rs @@ -33,7 +33,7 @@ pub trait Connect: Send + Sync { /// The connected IO Stream. type Transport: AsyncRead + AsyncWrite + Send + 'static; /// An error occured when trying to connect. - type Error; + type Error: Into>; /// A Future that will resolve to the connected Transport. type Future: Future + Send; /// Connect to a destination. diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index 98c591521e..6e176f2b76 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -38,10 +38,10 @@ impl Sender { // there's room in the queue, but does the Connection // want a message yet? self.giver.poll_want(cx) - .map_err(|_| ::Error::Closed) + .map_err(|_| ::Error::new_closed()) }, Ok(Async::Pending) => Ok(Async::Pending), - Err(_) => Err(::Error::Closed), + Err(_) => Err(::Error::new_closed()), } } @@ -183,9 +183,12 @@ mod tests { drop(rx); promise.then(|fulfilled| { - let res = fulfilled.expect("fulfilled"); - match res.unwrap_err() { - (::Error::Cancel(_), Some(_)) => (), + let err = fulfilled + .expect("fulfilled") + .expect_err("promise should error"); + + match (err.0.kind(), err.1) { + (&::error::Kind::Canceled, Some(_)) => (), e => panic!("expected Error::Cancel(_), found {:?}", e), } diff --git a/src/client/mod.rs b/src/client/mod.rs index 326db000f9..e53cbd70b9 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -6,7 +6,7 @@ use std::marker::PhantomData; use std::sync::Arc; use std::time::Duration; -use futures::{Async, Future, FutureExt, Never, Poll}; +use futures::{Async, Future, FutureExt, Poll}; use futures::channel::oneshot; use futures::future; use futures::task; @@ -93,10 +93,10 @@ impl Client { } impl Client -where C: Connect + Sync + 'static, +where C: Connect + Sync + 'static, C::Transport: 'static, C::Future: 'static, - B: Entity + Send + 'static, + B: Entity + Send + 'static, B::Data: Send, { @@ -128,13 +128,14 @@ where C: Connect + Sync + 'static, Version::HTTP_11 => (), other => { error!("Request has unsupported version \"{:?}\"", other); - return FutureResponse(Box::new(future::err(::Error::Version))); + //TODO: replace this with a proper variant + return FutureResponse(Box::new(future::err(::Error::new_user_unsupported_version()))); } } if req.method() == &Method::CONNECT { debug!("Client does not support CONNECT requests"); - return FutureResponse(Box::new(future::err(::Error::Method))); + return FutureResponse(Box::new(future::err(::Error::new_user_unsupported_request_method()))); } let uri = req.uri().clone(); @@ -143,7 +144,8 @@ where C: Connect + Sync + 'static, format!("{}://{}", scheme, auth) } _ => { - return FutureResponse(Box::new(future::err(::Error::Io( + //TODO: replace this with a proper variant + return FutureResponse(Box::new(future::err(::Error::new_io( io::Error::new( io::ErrorKind::InvalidInput, "invalid URI for Client Request" @@ -190,16 +192,16 @@ where C: Connect + Sync + 'static, }; future::lazy(move |_| { connector.connect(dst) - .err_into() + .map_err(::Error::new_connect) .and_then(move |(io, connected)| { conn::Builder::new() .h1_writev(h1_writev) .handshake_no_upgrades(io) .and_then(move |(tx, conn)| { future::lazy(move |cx| { - execute(conn.recover(|e| { + cx.spawn(conn.recover(|e| { debug!("client connection error: {}", e); - }), cx)?; + })); Ok(pool.pooled(pool_key, PoolClient { is_proxied: connected.is_proxied, tx: tx, @@ -251,8 +253,6 @@ where C: Connect + Sync + 'static, } else if !res.body().is_empty() { let (delayed_tx, delayed_rx) = oneshot::channel(); res.body_mut().delayed_eof(delayed_rx); - // If the executor doesn't have room, oh well. Things will likely - // be blowing up soon, but this specific task isn't required. let fut = future::poll_fn(move |cx| { pooled.tx.poll_ready(cx) }) @@ -262,7 +262,7 @@ where C: Connect + Sync + 'static, drop(delayed_tx); Ok(()) }); - execute(fut, cx).ok(); + cx.spawn(fut); } Ok(res) }) @@ -277,9 +277,9 @@ where C: Connect + Sync + 'static, } impl Service for Client -where C: Connect + 'static, +where C: Connect + 'static, C::Future: 'static, - B: Entity + Send + 'static, + B: Entity + Send + 'static, B::Data: Send, { type Request = Request; @@ -339,9 +339,9 @@ struct RetryableSendRequest { impl Future for RetryableSendRequest where - C: Connect + 'static, + C: Connect + 'static, C::Future: 'static, - B: Entity + Send + 'static, + B: Entity + Send + 'static, B::Data: Send, { type Item = Response; @@ -550,10 +550,10 @@ impl Config { } impl Config -where C: Connect, +where C: Connect, C::Transport: 'static, C::Future: 'static, - B: Entity + Send, + B: Entity + Send, B::Data: Send, { /// Construct the Client with this configuration. @@ -565,7 +565,7 @@ where C: Connect, impl Config where - B: Entity + Send, + B: Entity + Send, B::Data: Send, { /// Construct the Client with this configuration. @@ -600,16 +600,3 @@ impl Clone for Config { } } - -fn execute(fut: F, cx: &mut task::Context) -> Result<(), ::Error> - where F: Future + Send + 'static, -{ - if let Some(executor) = cx.executor() { - executor.spawn(Box::new(fut)).map_err(|err| { - debug!("executor error: {:?}", err); - ::Error::Executor - }) - } else { - Err(::Error::Executor) - } -} diff --git a/src/client/pool.rs b/src/client/pool.rs index 4fb077ceda..24717be087 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -218,16 +218,22 @@ impl PoolInner { impl Pool { - fn spawn_expired_interval(&mut self, cx: &mut task::Context) -> Result<(), ::Error> { + fn spawn_expired_interval(&mut self, cx: &mut task::Context) { + let exe = if let Some(exe) = cx.executor() { + exe + } else { + return + }; + let (dur, rx) = { let mut inner = self.inner.lock().unwrap(); if !inner.enabled { - return Ok(()); + return } if inner.idle_interval_ref.is_some() { - return Ok(()); + return } if let Some(dur) = inner.timeout { @@ -235,16 +241,17 @@ impl Pool { inner.idle_interval_ref = Some(tx); (dur, rx) } else { - return Ok(()); + return } }; let interval = Interval::new(dur); - super::execute(IdleInterval { + // This task isn't essential, so don't panic if spawn fails. + let _ = exe.spawn(Box::new(IdleInterval { interval: interval, pool: Arc::downgrade(&self.inner), pool_drop_notifier: rx, - }, cx) + })); } } @@ -363,7 +370,7 @@ impl Future for Checkout { fn poll(&mut self, cx: &mut task::Context) -> Poll { if !self.spawned_expired_interval { - self.pool.spawn_expired_interval(cx)?; + self.pool.spawn_expired_interval(cx); self.spawned_expired_interval = true; } @@ -515,7 +522,8 @@ mod tests { let mut pool = Pool::new(true, Some(Duration::from_millis(100))); block_on(future::lazy(|cx| { - pool.spawn_expired_interval(cx) + pool.spawn_expired_interval(cx); + Ok::<_, ()>(()) })).unwrap(); let key = Arc::new("foo".to_string()); diff --git a/src/client/tests.rs b/src/client/tests.rs index 2edf2d1124..5b4c1f52f3 100644 --- a/src/client/tests.rs +++ b/src/client/tests.rs @@ -36,7 +36,7 @@ fn retryable_request() { try_ready!(sock1.read(cx, &mut [0u8; 512])); try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")); Ok(Async::Ready(())) - }); + }).map_err(|e: ::std::io::Error| panic!("srv1 poll_fn error: {}", e)); block_on(res1.join(srv1)).expect("res1"); } drop(sock1); @@ -53,7 +53,7 @@ fn retryable_request() { try_ready!(sock2.read(cx, &mut [0u8; 512])); try_ready!(sock2.write(b"HTTP/1.1 222 OK\r\nContent-Length: 0\r\n\r\n")); Ok(Async::Ready(())) - }); + }).map_err(|e: ::std::io::Error| panic!("srv2 poll_fn error: {}", e)); block_on(res2.join(srv2)).expect("res2"); } @@ -83,7 +83,7 @@ fn conn_reset_after_write() { try_ready!(sock1.read(cx, &mut [0u8; 512])); try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")); Ok(Async::Ready(())) - }); + }).map_err(|e: ::std::io::Error| panic!("srv1 poll_fn error: {}", e)); block_on(res1.join(srv1)).expect("res1"); } @@ -106,10 +106,10 @@ fn conn_reset_after_write() { try_ready!(sock1.as_mut().unwrap().read(cx, &mut [0u8; 512])); sock1.take(); Ok(Async::Ready(())) - }); + }).map_err(|e: ::std::io::Error| panic!("srv2 poll_fn error: {}", e)); let err = block_on(res2.join(srv2)).expect_err("res2"); - match err { - ::Error::Incomplete => (), + match err.kind() { + &::error::Kind::Incomplete => (), other => panic!("expected Incomplete, found {:?}", other) } } diff --git a/src/error.rs b/src/error.rs index 45b15d12c1..1a04322eaf 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,192 +1,316 @@ //! Error and Result module. use std::error::Error as StdError; use std::fmt; -use std::io::Error as IoError; -use std::str::Utf8Error; -use std::string::FromUtf8Error; +use std::io; use httparse; use http; -use self::Error::{ - Method, - Version, - Uri, - Header, - Status, - Timeout, - Upgrade, - Closed, - Cancel, - Io, - TooLarge, - Incomplete, - Utf8, - Executor -}; - /// Result type often returned from methods that can have hyper `Error`s. pub type Result = ::std::result::Result; -/// A set of errors that can occur parsing HTTP streams. -#[derive(Debug)] -pub enum Error { - /// An invalid `Method`, such as `GE,T`. - Method, - /// An invalid `HttpVersion`, such as `HTP/1.1` - Version, - /// Uri Errors - Uri, - /// An invalid `Header`. - Header, - /// A message head is too large to be reasonable. - TooLarge, +type Cause = Box; + +/// Represents errors that can occur handling HTTP streams. +pub struct Error { + inner: Box, +} + +struct ErrorImpl { + kind: Kind, + cause: Option, +} + +#[derive(Debug, PartialEq)] +pub(crate) enum Kind { + Parse(Parse), /// A message reached EOF, but is not complete. Incomplete, - /// An invalid `Status`, such as `1337 ELITE`. - Status, - /// A timeout occurred waiting for an IO event. - Timeout, /// A protocol upgrade was encountered, but not yet supported in hyper. Upgrade, + /// A client connection received a response when not waiting for one. + MismatchedResponse, /// A pending item was dropped before ever being processed. - Cancel(Canceled), + Canceled, /// Indicates a connection is closed. Closed, /// An `io::Error` that occurred while trying to read or write to a network stream. - Io(IoError), - /// Parsing a field as string failed - Utf8(Utf8Error), - /// Executing a future failed - Executor, - - #[doc(hidden)] - __Nonexhaustive(Void) + Io, + /// Error occurred while connecting. + Connect, + /// Error creating a TcpListener. + Listen, + /// Error accepting on an Incoming stream. + Accept, + /// Error calling user's NewService::new_service(). + NewService, + /// Error from future of user's Service::call(). + Service, + /// Error while reading a body from connection. + Body, + /// Error while writing a body to connection. + BodyWrite, + /// Error calling user's Entity::poll_data(). + BodyUser, + /// Error calling AsyncWrite::shutdown() + Shutdown, + + /// User tried to create a Request with bad version. + UnsupportedVersion, + /// User tried to create a CONNECT Request with the Client. + UnsupportedRequestMethod, } -impl Error { - pub(crate) fn new_canceled>>(cause: Option) -> Error { - Error::Cancel(Canceled { - cause: cause.map(Into::into), - }) - } +#[derive(Debug, PartialEq)] +pub(crate) enum Parse { + Method, + Version, + Uri, + Header, + TooLarge, + Status, } -/// A pending item was dropped before ever being processed. -/// -/// For example, a `Request` could be queued in the `Client`, *just* -/// as the related connection gets closed by the remote. In that case, -/// when the connection drops, the pending response future will be -/// fulfilled with this error, signaling the `Request` was never started. +/* #[derive(Debug)] -pub struct Canceled { - cause: Option>, +pub(crate) enum User { + VersionNotSupported, + MethodNotSupported, + InvalidRequestUri, } +*/ -impl Canceled { - fn description(&self) -> &str { - "an operation was canceled internally before starting" +impl Error { + //TODO(error): should there be these kinds of inspection methods? + // + // - is_io() + // - is_connect() + // - is_closed() + // - etc? + + /// Returns true if this was an HTTP parse error. + pub fn is_parse(&self) -> bool { + match self.inner.kind { + Kind::Parse(_) => true, + _ => false, + } } -} -impl fmt::Display for Canceled { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.pad(self.description()) + /// Returns true if this error was caused by user code. + pub fn is_user(&self) -> bool { + match self.inner.kind { + Kind::BodyUser | + Kind::NewService | + Kind::Service | + Kind::Closed | + Kind::UnsupportedVersion | + Kind::UnsupportedRequestMethod => true, + _ => false, + } } -} -#[doc(hidden)] -pub struct Void(()); + /// Returns true if this was about a `Request` that was canceled. + pub fn is_canceled(&self) -> bool { + self.inner.kind == Kind::Canceled + } + + /// Returns true if a sender's channel is closed. + pub fn is_closed(&self) -> bool { + self.inner.kind == Kind::Closed + } + + pub(crate) fn new(kind: Kind, cause: Option) -> Error { + Error { + inner: Box::new(ErrorImpl { + kind, + cause, + }), + } + } + + pub(crate) fn kind(&self) -> &Kind { + &self.inner.kind + } + + pub(crate) fn new_canceled>(cause: Option) -> Error { + Error::new(Kind::Canceled, cause.map(Into::into)) + } + + pub(crate) fn new_upgrade() -> Error { + Error::new(Kind::Upgrade, None) + } + + pub(crate) fn new_incomplete() -> Error { + Error::new(Kind::Incomplete, None) + } + + pub(crate) fn new_too_large() -> Error { + Error::new(Kind::Parse(Parse::TooLarge), None) + } -impl fmt::Debug for Void { - fn fmt(&self, _: &mut fmt::Formatter) -> fmt::Result { - unreachable!() + pub(crate) fn new_header() -> Error { + Error::new(Kind::Parse(Parse::Header), None) + } + + pub(crate) fn new_status() -> Error { + Error::new(Kind::Parse(Parse::Status), None) + } + + pub(crate) fn new_version() -> Error { + Error::new(Kind::Parse(Parse::Version), None) + } + + pub(crate) fn new_mismatched_response() -> Error { + Error::new(Kind::MismatchedResponse, None) + } + + pub(crate) fn new_io(cause: io::Error) -> Error { + Error::new(Kind::Io, Some(cause.into())) + } + + pub(crate) fn new_listen(err: io::Error) -> Error { + Error::new(Kind::Listen, Some(err.into())) + } + + pub(crate) fn new_accept(err: io::Error) -> Error { + Error::new(Kind::Accept, Some(Box::new(err))) + } + + pub(crate) fn new_connect>(cause: E) -> Error { + Error::new(Kind::Connect, Some(cause.into())) + } + + pub(crate) fn new_closed() -> Error { + Error::new(Kind::Closed, None) + } + + pub(crate) fn new_body(cause: io::Error) -> Error { + Error::new(Kind::Body, Some(Box::new(cause))) + } + + pub(crate) fn new_body_write(cause: io::Error) -> Error { + Error::new(Kind::BodyWrite, Some(Box::new(cause))) + } + + pub(crate) fn new_user_unsupported_version() -> Error { + Error::new(Kind::UnsupportedVersion, None) + } + + pub(crate) fn new_user_unsupported_request_method() -> Error { + Error::new(Kind::UnsupportedRequestMethod, None) + } + + pub(crate) fn new_user_new_service(err: io::Error) -> Error { + Error::new(Kind::NewService, Some(Box::new(err))) + } + + pub(crate) fn new_user_service>(cause: E) -> Error { + Error::new(Kind::Service, Some(cause.into())) + } + + pub(crate) fn new_user_body>(cause: E) -> Error { + Error::new(Kind::BodyUser, Some(cause.into())) + } + + pub(crate) fn new_shutdown(cause: io::Error) -> Error { + Error::new(Kind::Shutdown, Some(Box::new(cause))) + } +} + +impl fmt::Debug for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Error") + .field("kind", &self.inner.kind) + .field("cause", &self.inner.cause) + .finish() } } impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match *self { - Io(ref e) => fmt::Display::fmt(e, f), - Utf8(ref e) => fmt::Display::fmt(e, f), - ref e => f.write_str(e.description()), + if let Some(ref cause) = self.inner.cause { + write!(f, "{}: {}", self.description(), cause) + } else { + f.write_str(self.description()) } } } impl StdError for Error { fn description(&self) -> &str { - match *self { - Method => "invalid Method specified", - Version => "invalid HTTP version specified", - Uri => "invalid URI", - Header => "invalid Header provided", - TooLarge => "message head is too large", - Status => "invalid Status provided", - Incomplete => "message is incomplete", - Timeout => "timeout", - Upgrade => "unsupported protocol upgrade", - Closed => "connection is closed", - Cancel(ref e) => e.description(), - Io(ref e) => e.description(), - Utf8(ref e) => e.description(), - Executor => "executor is missing or failed to spawn", - Error::__Nonexhaustive(..) => unreachable!(), - } - } + match self.inner.kind { + Kind::Parse(Parse::Method) => "invalid Method specified", + Kind::Parse(Parse::Version) => "invalid HTTP version specified", + Kind::Parse(Parse::Uri) => "invalid URI", + Kind::Parse(Parse::Header) => "invalid Header provided", + Kind::Parse(Parse::TooLarge) => "message head is too large", + Kind::Parse(Parse::Status) => "invalid Status provided", + Kind::Incomplete => "message is incomplete", + Kind::Upgrade => "unsupported protocol upgrade", + Kind::MismatchedResponse => "response received without matching request", + Kind::Closed => "connection closed", + Kind::Connect => "an error occurred trying to connect", + Kind::Canceled => "an operation was canceled internally before starting", + Kind::Listen => "error creating server listener", + Kind::Accept => "error accepting connection", + Kind::NewService => "calling user's new_service failed", + Kind::Service => "error from user's server service", + Kind::Body => "error reading a body from connection", + Kind::BodyWrite => "error write a body to connection", + Kind::BodyUser => "error from user's Entity stream", + Kind::Shutdown => "error shutting down connection", + Kind::UnsupportedVersion => "request has unsupported HTTP version", + Kind::UnsupportedRequestMethod => "request has unsupported HTTP method", - fn cause(&self) -> Option<&StdError> { - match *self { - Io(ref error) => Some(error), - Utf8(ref error) => Some(error), - Cancel(ref e) => e.cause.as_ref().map(|e| &**e as &StdError), - Error::__Nonexhaustive(..) => unreachable!(), - _ => None, + Kind::Io => "an IO error occurred", } } -} -impl From for Error { - fn from(err: IoError) -> Error { - Io(err) - } -} - -impl From for Error { - fn from(err: Utf8Error) -> Error { - Utf8(err) + fn cause(&self) -> Option<&StdError> { + self + .inner + .cause + .as_ref() + .map(|cause| &**cause as &StdError) } } -impl From for Error { - fn from(err: FromUtf8Error) -> Error { - Utf8(err.utf8_error()) +#[doc(hidden)] +impl From for Error { + fn from(err: Parse) -> Error { + Error::new(Kind::Parse(err), None) } } -impl From for Error { - fn from(err: httparse::Error) -> Error { +impl From for Parse { + fn from(err: httparse::Error) -> Parse { match err { httparse::Error::HeaderName | httparse::Error::HeaderValue | httparse::Error::NewLine | - httparse::Error::Token => Header, - httparse::Error::Status => Status, - httparse::Error::TooManyHeaders => TooLarge, - httparse::Error::Version => Version, + httparse::Error::Token => Parse::Header, + httparse::Error::Status => Parse::Status, + httparse::Error::TooManyHeaders => Parse::TooLarge, + httparse::Error::Version => Parse::Version, } } } -impl From for Error { - fn from(_: http::method::InvalidMethod) -> Error { - Error::Method +impl From for Parse { + fn from(_: http::method::InvalidMethod) -> Parse { + Parse::Method + } +} + +impl From for Parse { + fn from(_: http::status::InvalidStatusCode) -> Parse { + Parse::Status } } -impl From for Error { - fn from(_: http::uri::InvalidUriBytes) -> Error { - Error::Uri +impl From for Parse { + fn from(_: http::uri::InvalidUriBytes) -> Parse { + Parse::Uri } } @@ -195,58 +319,3 @@ trait AssertSendSync: Send + Sync + 'static {} #[doc(hidden)] impl AssertSendSync for Error {} -#[cfg(test)] -mod tests { - use std::error::Error as StdError; - use std::io; - use httparse; - use super::Error; - use super::Error::*; - - #[test] - fn test_cause() { - let orig = io::Error::new(io::ErrorKind::Other, "other"); - let desc = orig.description().to_owned(); - let e = Io(orig); - assert_eq!(e.cause().unwrap().description(), desc); - } - - macro_rules! from { - ($from:expr => $error:pat) => { - match Error::from($from) { - e @ $error => { - assert!(e.description().len() >= 5); - } , - e => panic!("{:?}", e) - } - } - } - - macro_rules! from_and_cause { - ($from:expr => $error:pat) => { - match Error::from($from) { - e @ $error => { - let desc = e.cause().unwrap().description(); - assert_eq!(desc, $from.description().to_owned()); - assert_eq!(desc, e.description()); - }, - _ => panic!("{:?}", $from) - } - } - } - - #[test] - fn test_from() { - - from_and_cause!(io::Error::new(io::ErrorKind::Other, "other") => Io(..)); - - from!(httparse::Error::HeaderName => Header); - from!(httparse::Error::HeaderName => Header); - from!(httparse::Error::HeaderValue => Header); - from!(httparse::Error::NewLine => Header); - from!(httparse::Error::Status => Status); - from!(httparse::Error::Token => Header); - from!(httparse::Error::TooManyHeaders => TooLarge); - from!(httparse::Error::Version => Version); - } -} diff --git a/src/proto/body.rs b/src/proto/body.rs index 8adf3c374e..ab3eafef73 100644 --- a/src/proto/body.rs +++ b/src/proto/body.rs @@ -18,8 +18,7 @@ pub trait Entity { type Data: AsRef<[u8]>; /// The error type of this stream. - //TODO: add bounds Into<::error::User> (or whatever it is called) - type Error; + type Error: Into>; /// Poll for a `Data` buffer. /// @@ -141,7 +140,7 @@ enum Kind { _close_tx: oneshot::Sender<()>, rx: mpsc::Receiver>, }, - Wrapped(Box + Send>), + Wrapped(Box> + Send>), Once(Option), Empty, } @@ -212,17 +211,22 @@ impl Body { /// " ", /// "world", /// ]; - /// let stream = futures::stream::iter_ok(chunks); + /// + /// let stream = futures::stream::iter_ok::<_, ::std::io::Error>(chunks); /// /// let body = Body::wrap_stream(stream); /// # } /// ``` pub fn wrap_stream(stream: S) -> Body where - S: Stream + Send + 'static, + S: Stream + Send + 'static, + S::Error: Into>, Chunk: From, { - Body::new(Kind::Wrapped(Box::new(stream.map(Chunk::from)))) + let mapped = stream + .map(Chunk::from) + .map_err(Into::into); + Body::new(Kind::Wrapped(Box::new(mapped))) } /// Convert this `Body` into a `Stream`. @@ -327,7 +331,7 @@ impl Body { Async::Ready(None) => Ok(Async::Ready(None)), Async::Pending => Ok(Async::Pending), }, - Kind::Wrapped(ref mut s) => s.poll_next(cx), + Kind::Wrapped(ref mut s) => s.poll_next(cx).map_err(|e| unimplemented!("Body wrapped stream error: {}", e)), Kind::Once(ref mut val) => Ok(Async::Ready(val.take())), Kind::Empty => Ok(Async::Ready(None)), } diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index b3a287ba28..d240278904 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -20,7 +20,7 @@ use super::{EncodedBuf, Encoder, Decoder}; /// The connection will determine when a message begins and ends as well as /// determine if this connection can be kept alive after the message, /// or if it is complete. -pub struct Conn { +pub(crate) struct Conn { io: Buffered>>, state: State, _marker: PhantomData @@ -146,7 +146,8 @@ where I: AsyncRead + AsyncWrite, _ => { error!("unimplemented HTTP Version = {:?}", version); self.state.close_read(); - return Err(::Error::Version); + //TODO: replace this with a more descriptive error + return Err(::Error::new_version()); } }; self.state.version = version; @@ -245,7 +246,7 @@ where I: AsyncRead + AsyncWrite, if self.is_mid_message() { self.maybe_park_read(cx); } else { - self.require_empty_read(cx)?; + self.require_empty_read(cx).map_err(::Error::new_io)?; } Ok(()) } diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 74036f12db..a0ffb3bb02 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -1,5 +1,3 @@ -use std::io; - use bytes::Bytes; use futures::{Async, Future, Poll, Stream}; use futures::task; @@ -8,10 +6,9 @@ use http::{Request, Response, StatusCode}; use proto::body::Entity; use proto::{Body, BodyLength, Conn, Http1Transaction, MessageHead, RequestHead, RequestLine, ResponseHead}; +use service::Service; -use ::service::Service; - -pub struct Dispatcher { +pub(crate) struct Dispatcher { conn: Conn, dispatch: D, body_tx: Option<::proto::body::Sender>, @@ -19,7 +16,7 @@ pub struct Dispatcher { is_closing: bool, } -pub trait Dispatch { +pub(crate) trait Dispatch { type PollItem; type PollBody; type RecvItem; @@ -49,7 +46,7 @@ where I: AsyncRead + AsyncWrite, B: AsRef<[u8]>, T: Http1Transaction, - Bs: Entity, + Bs: Entity, { pub fn new(dispatch: D, conn: Conn) -> Self { Dispatcher { @@ -100,7 +97,7 @@ where if self.is_done() { if should_shutdown { - try_ready!(self.conn.shutdown(cx)); + try_ready!(self.conn.shutdown(cx).map_err(::Error::new_shutdown)); } self.conn.take_error()?; Ok(Async::Ready(())) @@ -154,7 +151,7 @@ where return Ok(Async::Pending); } Err(e) => { - body.send_error(::Error::Io(e)); + body.send_error(::Error::new_body(e)); } } } else { @@ -227,14 +224,14 @@ where } else if !self.conn.can_buffer_body() { try_ready!(self.poll_flush(cx)); } else if let Some(mut body) = self.body_rx.take() { - let chunk = match body.poll_data(cx)? { + let chunk = match body.poll_data(cx).map_err(::Error::new_user_body)? { Async::Ready(Some(chunk)) => { self.body_rx = Some(body); chunk }, Async::Ready(None) => { if self.conn.can_write_body() { - self.conn.write_body(cx, None)?; + self.conn.write_body(cx, None).map_err(::Error::new_body_write)?; } continue; }, @@ -245,7 +242,7 @@ where }; if self.conn.can_write_body() { - self.conn.write_body(cx, Some(chunk))?; + self.conn.write_body(cx, Some(chunk)).map_err(::Error::new_body_write)?; // This allows when chunk is `None`, or `Some([])`. } else if chunk.as_ref().len() == 0 { // ok @@ -261,7 +258,7 @@ where fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), ::Error> { self.conn.flush(cx).map_err(|err| { debug!("error writing: {}", err); - err.into() + ::Error::new_body_write(err) }) } @@ -296,7 +293,7 @@ where I: AsyncRead + AsyncWrite, B: AsRef<[u8]>, T: Http1Transaction, - Bs: Entity, + Bs: Entity, { type Item = (); type Error = ::Error; @@ -320,8 +317,9 @@ impl Server where S: Service { impl Dispatch for Server where - S: Service, Response=Response, Error=::Error>, - Bs: Entity, + S: Service, Response=Response>, + S::Error: Into>, + Bs: Entity, { type PollItem = MessageHead; type PollBody = Bs; @@ -329,7 +327,7 @@ where fn poll_msg(&mut self, cx: &mut task::Context) -> Poll)>, ::Error> { if let Some(mut fut) = self.in_flight.take() { - let resp = match fut.poll(cx)? { + let resp = match fut.poll(cx).map_err(::Error::new_user_service)? { Async::Ready(res) => res, Async::Pending => { self.in_flight = Some(fut); @@ -391,7 +389,7 @@ impl Client { impl Dispatch for Client where - B: Entity, + B: Entity, { type PollItem = RequestHead; type PollBody = B; @@ -445,7 +443,7 @@ where let _ = cb.send(Ok(res)); Ok(()) } else { - Err(::Error::Io(io::Error::new(io::ErrorKind::InvalidData, "response received without matching request"))) + Err(::Error::new_mismatched_response()) } }, Err(err) => { @@ -507,12 +505,17 @@ mod tests { let a1 = dispatcher.poll(cx).expect("error should be sent on channel"); assert!(a1.is_ready(), "dispatcher should be closed"); - let result = res_rx.poll(cx) + let async = res_rx.poll(cx) .expect("callback poll"); - match result { - Async::Ready(Err((::Error::Cancel(_), Some(_)))) => (), - other => panic!("expected Err(Canceled), got {:?}", other), + let err = match async { + Async::Ready(result) => result.unwrap_err(), + Async::Pending => panic!("callback should be ready"), + }; + + match (err.0.kind(), err.1) { + (&::error::Kind::Canceled, Some(_)) => (), + other => panic!("expected Canceled, got {:?}", other), } Ok::<_, ()>(()) })).unwrap(); diff --git a/src/proto/h1/io.rs b/src/proto/h1/io.rs index 426a10c4d5..3af99e3d5e 100644 --- a/src/proto/h1/io.rs +++ b/src/proto/h1/io.rs @@ -108,7 +108,7 @@ where } } - pub fn parse(&mut self, cx: &mut task::Context) -> Poll, ::Error> { + pub(super) fn parse(&mut self, cx: &mut task::Context) -> Poll, ::Error> { loop { match try!(S::parse(&mut self.read_buf)) { Some((head, len)) => { @@ -118,14 +118,14 @@ where None => { if self.read_buf.capacity() >= self.max_buf_size { debug!("max_buf_size ({}) reached, closing", self.max_buf_size); - return Err(::Error::TooLarge); + return Err(::Error::new_too_large()); } }, } - match try_ready!(self.read_from_io(cx)) { + match try_ready!(self.read_from_io(cx).map_err(::Error::new_io)) { 0 => { trace!("parse eof"); - return Err(::Error::Incomplete); + return Err(::Error::new_incomplete()); } _ => {}, } diff --git a/src/proto/h1/mod.rs b/src/proto/h1/mod.rs index 4ff94ee8e0..42af891b14 100644 --- a/src/proto/h1/mod.rs +++ b/src/proto/h1/mod.rs @@ -1,11 +1,11 @@ -pub use self::conn::Conn; +pub(crate) use self::conn::Conn; pub use self::decode::Decoder; pub use self::encode::{EncodedBuf, Encoder}; mod conn; mod date; mod decode; -pub mod dispatch; +pub(crate) mod dispatch; mod encode; mod io; pub mod role; diff --git a/src/proto/h1/role.rs b/src/proto/h1/role.rs index 2c1320cbd3..035e6d90a6 100644 --- a/src/proto/h1/role.rs +++ b/src/proto/h1/role.rs @@ -40,7 +40,7 @@ where let mut headers = [httparse::EMPTY_HEADER; MAX_HEADERS]; trace!("Request.parse([Header; {}], [u8; {}])", headers.len(), buf.len()); let mut req = httparse::Request::new(&mut headers); - match try!(req.parse(&buf)) { + match req.parse(&buf)? { httparse::Status::Complete(len) => { trace!("Request.parse Complete({})", len); let method = Method::from_bytes(req.method.unwrap().as_bytes())?; @@ -104,18 +104,18 @@ where // mal-formed. A server should respond with 400 Bad Request. if head.version == Version::HTTP_10 { debug!("HTTP/1.0 cannot have Transfer-Encoding header"); - Err(::Error::Header) + Err(::Error::new_header()) } else if headers::transfer_encoding_is_chunked(&head.headers) { Ok(Decode::Normal(Decoder::chunked())) } else { debug!("request with transfer-encoding header, but not chunked, bad request"); - Err(::Error::Header) + Err(::Error::new_header()) } } else if let Some(len) = headers::content_length_parse(&head.headers) { Ok(Decode::Normal(Decoder::length(len))) } else if head.headers.contains_key(CONTENT_LENGTH) { debug!("illegal Content-Length header"); - Err(::Error::Header) + Err(::Error::new_header()) } else { Ok(Decode::Normal(Decoder::length(0))) } @@ -146,7 +146,8 @@ where head = MessageHead::default(); head.subject = StatusCode::INTERNAL_SERVER_ERROR; headers::content_length_zero(&mut head.headers); - Err(::Error::Status) + //TODO: change this to a more descriptive error than just a parse error + Err(::Error::new_status()) } else { Ok(Server::set_length(&mut head, body, method.as_ref())) }; @@ -184,14 +185,15 @@ where } fn on_error(err: &::Error) -> Option> { - let status = match err { - &::Error::Method | - &::Error::Version | - &::Error::Header /*| - &::Error::Uri(_)*/ => { + use ::error::{Kind, Parse}; + let status = match *err.kind() { + Kind::Parse(Parse::Method) | + Kind::Parse(Parse::Version) | + Kind::Parse(Parse::Header) | + Kind::Parse(Parse::Uri) => { StatusCode::BAD_REQUEST }, - &::Error::TooLarge => { + Kind::Parse(Parse::TooLarge) => { StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE } _ => return None, @@ -271,7 +273,7 @@ where match try!(res.parse(bytes)) { httparse::Status::Complete(len) => { trace!("Response.parse Complete({})", len); - let status = try!(StatusCode::from_u16(res.code.unwrap()).map_err(|_| ::Error::Status)); + let status = StatusCode::from_u16(res.code.unwrap())?; let version = if res.version.unwrap() == 1 { Version::HTTP_11 } else { @@ -343,7 +345,7 @@ where // mal-formed. A server should respond with 400 Bad Request. if inc.version == Version::HTTP_10 { debug!("HTTP/1.0 cannot have Transfer-Encoding header"); - Err(::Error::Header) + Err(::Error::new_header()) } else if headers::transfer_encoding_is_chunked(&inc.headers) { Ok(Decode::Normal(Decoder::chunked())) } else { @@ -354,7 +356,7 @@ where Ok(Decode::Normal(Decoder::length(len))) } else if inc.headers.contains_key(CONTENT_LENGTH) { debug!("illegal Content-Length header"); - Err(::Error::Header) + Err(::Error::new_header()) } else { trace!("neither Transfer-Encoding nor Content-Length"); Ok(Decode::Normal(Decoder::eof())) @@ -577,12 +579,13 @@ impl OnUpgrade for NoUpgrades { *head = MessageHead::default(); head.subject = ::StatusCode::INTERNAL_SERVER_ERROR; headers::content_length_zero(&mut head.headers); - Err(::Error::Status) + //TODO: replace with more descriptive error + return Err(::Error::new_status()); } fn on_decode_upgrade() -> ::Result { debug!("received 101 upgrade response, not supported"); - return Err(::Error::Upgrade); + return Err(::Error::new_upgrade()); } } diff --git a/src/proto/mod.rs b/src/proto/mod.rs index b0e2a9d531..71356ef62b 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -6,7 +6,7 @@ use headers; pub use self::body::Body; pub use self::chunk::Chunk; -pub use self::h1::{dispatch, Conn}; +pub(crate) use self::h1::{dispatch, Conn}; pub mod body; mod chunk; @@ -60,14 +60,14 @@ pub fn expecting_continue(version: Version, headers: &HeaderMap) -> bool { version == Version::HTTP_11 && headers::expect_continue(headers) } -pub type ServerTransaction = h1::role::Server; +pub(crate) type ServerTransaction = h1::role::Server; //pub type ServerTransaction = h1::role::Server; //pub type ServerUpgradeTransaction = h1::role::Server; -pub type ClientTransaction = h1::role::Client; -pub type ClientUpgradeTransaction = h1::role::Client; +pub(crate) type ClientTransaction = h1::role::Client; +pub(crate) type ClientUpgradeTransaction = h1::role::Client; -pub trait Http1Transaction { +pub(crate) trait Http1Transaction { type Incoming; type Outgoing: Default; fn parse(bytes: &mut BytesMut) -> ParseResult; @@ -84,7 +84,7 @@ pub trait Http1Transaction { fn should_read_first() -> bool; } -pub type ParseResult = ::Result, usize)>>; +pub(crate) type ParseResult = Result, usize)>, ::error::Parse>; #[derive(Debug)] pub enum BodyLength { diff --git a/src/server/conn.rs b/src/server/conn.rs index 22475b02e0..8d4abe3c75 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -26,7 +26,7 @@ use super::{HyperService, Request, Response, Service}; pub struct Connection where S: HyperService, - S::ResponseBody: Entity, + S::ResponseBody: Entity, { pub(super) conn: proto::dispatch::Dispatcher< proto::dispatch::Server, @@ -60,9 +60,11 @@ pub struct Parts { // ===== impl Connection ===== impl Connection -where S: Service, Response = Response, Error = ::Error> + 'static, - I: AsyncRead + AsyncWrite + 'static, - B: Entity + 'static, +where + S: Service, Response=Response> + 'static, + S::Error: Into>, + I: AsyncRead + AsyncWrite + 'static, + B: Entity + 'static, { /// Disables keep-alive for this connection. pub fn disable_keep_alive(&mut self) { @@ -97,9 +99,11 @@ where S: Service, Response = Response, Error = ::Erro } impl Future for Connection -where S: Service, Response = Response, Error = ::Error> + 'static, - I: AsyncRead + AsyncWrite + 'static, - B: Entity + 'static, +where + S: Service, Response=Response> + 'static, + S::Error: Into>, + I: AsyncRead + AsyncWrite + 'static, + B: Entity + 'static, { type Item = (); type Error = ::Error; @@ -112,7 +116,7 @@ where S: Service, Response = Response, Error = ::Erro impl fmt::Debug for Connection where S: HyperService, - S::ResponseBody: Entity, + S::ResponseBody: Entity, { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Connection") diff --git a/src/server/mod.rs b/src/server/mod.rs index e4ddf24742..b233ba5dff 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -50,7 +50,7 @@ pub struct Http { /// address and then serving TCP connections accepted with the service provided. pub struct Server where - B: Entity, + B: Entity, { protocol: Http, new_service: S, @@ -164,12 +164,14 @@ impl + 'static> Http { /// The returned `Server` contains one method, `run`, which is used to /// actually run the server. pub fn bind(&self, addr: &SocketAddr, new_service: S) -> ::Result> - where S: NewService, Response = Response, Error = ::Error> + 'static, - Bd: Entity, + where + S: NewService, Response=Response> + 'static, + S::Error: Into>, + Bd: Entity, { let handle = Handle::current(); - let std_listener = StdTcpListener::bind(addr)?; - let listener = try!(TcpListener::from_std(std_listener, &handle)); + let std_listener = StdTcpListener::bind(addr).map_err(::Error::new_listen)?; + let listener = TcpListener::from_std(std_listener, &handle).map_err(::Error::new_listen)?; Ok(Server { new_service: new_service, @@ -187,13 +189,15 @@ impl + 'static> Http { /// `new_service` object provided as well, creating a new service per /// connection. pub fn serve_addr(&self, addr: &SocketAddr, new_service: S) -> ::Result> - where S: NewService, Response = Response, Error = ::Error>, - Bd: Entity, + where + S: NewService, Response=Response>, + S::Error: Into>, + Bd: Entity, { let handle = Handle::current(); - let std_listener = StdTcpListener::bind(addr)?; - let listener = TcpListener::from_std(std_listener, &handle)?; - let mut incoming = AddrIncoming::new(listener, handle.clone(), self.sleep_on_errors)?; + let std_listener = StdTcpListener::bind(addr).map_err(::Error::new_listen)?; + let listener = TcpListener::from_std(std_listener, &handle).map_err(::Error::new_listen)?; + let mut incoming = AddrIncoming::new(listener, handle.clone(), self.sleep_on_errors).map_err(::Error::new_listen)?; if self.keep_alive { incoming.set_keepalive(Some(Duration::from_secs(90))); } @@ -209,12 +213,15 @@ impl + 'static> Http { /// `new_service` object provided as well, creating a new service per /// connection. pub fn serve_addr_handle(&self, addr: &SocketAddr, handle: &Handle, new_service: S) -> ::Result> - where S: NewService, Response = Response, Error = ::Error>, - Bd: Entity, + where + S: NewService, Response = Response>, + S::Error: Into>, + Bd: Entity, { - let std_listener = StdTcpListener::bind(addr)?; - let listener = TcpListener::from_std(std_listener, &handle)?; - let mut incoming = AddrIncoming::new(listener, handle.clone(), self.sleep_on_errors)?; + let std_listener = StdTcpListener::bind(addr).map_err(::Error::new_listen)?; + let listener = TcpListener::from_std(std_listener, &handle).map_err(::Error::new_listen)?; + let mut incoming = AddrIncoming::new(listener, handle.clone(), self.sleep_on_errors).map_err(::Error::new_listen)?; + if self.keep_alive { incoming.set_keepalive(Some(Duration::from_secs(90))); } @@ -225,10 +232,12 @@ impl + 'static> Http { /// /// This method allows the ability to share a `Core` with multiple servers. pub fn serve_incoming(&self, incoming: I, new_service: S) -> Serve - where I: Stream, - I::Item: AsyncRead + AsyncWrite, - S: NewService, Response = Response, Error = ::Error>, - Bd: Entity, + where + I: Stream, + I::Item: AsyncRead + AsyncWrite, + S: NewService, Response = Response>, + S::Error: Into>, + Bd: Entity, { Serve { incoming: incoming, @@ -277,9 +286,11 @@ impl + 'static> Http { /// # fn main() {} /// ``` pub fn serve_connection(&self, io: I, service: S) -> Connection - where S: Service, Response = Response, Error = ::Error>, - Bd: Entity, - I: AsyncRead + AsyncWrite, + where + S: Service, Response = Response>, + S::Error: Into>, + Bd: Entity, + I: AsyncRead + AsyncWrite, { let mut conn = proto::Conn::new(io); if !self.keep_alive { @@ -339,15 +350,19 @@ impl Future for Run { impl Server - where S: NewService, Response = Response, Error = ::Error> + Send + 'static, - ::Instance: Send, - <::Instance as Service>::Future: Send, - B: Entity + Send + 'static, - B::Data: Send, +where + S: NewService, Response = Response> + Send + 'static, + S::Error: Into>, + ::Instance: Send, + <::Instance as Service>::Future: Send, + B: Entity + Send + 'static, + B::Data: Send, { /// Returns the local address that this server is bound to. pub fn local_addr(&self) -> ::Result { - Ok(try!(self.listener.local_addr())) + //TODO: this shouldn't return an error at all, but should get the + //local_addr at construction + self.listener.local_addr().map_err(::Error::new_io) } /// Configure the amount of time this server will wait for a "graceful @@ -391,7 +406,7 @@ impl Server let mut incoming = match AddrIncoming::new(listener, handle.clone(), protocol.sleep_on_errors) { Ok(incoming) => incoming, - Err(err) => return Run(Box::new(future::err(err.into()))), + Err(err) => return Run(Box::new(future::err(::Error::new_listen(err)))), }; if protocol.keep_alive { @@ -441,10 +456,11 @@ impl Server let main_execution = shutdown_signal.select(srv).then(move |result| { match result { Ok(_) => {}, + //Err((e, _other)) => return future::Either::A(future::err(::Error::new_accept(e))) Err(future::Either::Left((e, _other))) => return future::Either::Left(future::err(e)), Err(future::Either::Right((e, _other))) => - return future::Either::Left(future::err(e.into())), + return future::Either::Left(future::err(::Error::new_accept(e))), } // Ok we've stopped accepting new connections at this point, but we want @@ -459,8 +475,9 @@ impl Server future::Either::Right(wait.select(timeout).then(|result| { match result { Ok(_) => Ok(()), - Err(future::Either::Left((e, _))) => Err(e.into()), - Err(future::Either::Right((e, _))) => Err(e.into()) + //TODO: error variant should be "timed out waiting for graceful shutdown" + Err(future::Either::Left((e, _))) => Err(::Error::new_io(e)), + Err(future::Either::Right((e, _))) => Err(::Error::new_io(e)) } })) }); @@ -469,11 +486,10 @@ impl Server } } -impl> fmt::Debug for Server +impl fmt::Debug for Server { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Server") - .field("reactor", &"...") .field("listener", &self.listener) .field("new_service", &self.new_service) .field("protocol", &self.protocol) @@ -505,15 +521,16 @@ impl Stream for Serve where I: Stream, I::Item: AsyncRead + AsyncWrite, - S: NewService, Response=Response, Error=::Error>, - B: Entity, + S: NewService, Response=Response>, + S::Error: Into>, + B: Entity, { type Item = Connection; type Error = ::Error; fn poll_next(&mut self, cx: &mut task::Context) -> Poll, Self::Error> { - if let Some(io) = try_ready!(self.incoming.poll_next(cx)) { - let service = self.new_service.new_service()?; + if let Some(io) = try_ready!(self.incoming.poll_next(cx).map_err(::Error::new_accept)) { + let service = self.new_service.new_service().map_err(::Error::new_user_new_service)?; Ok(Async::Ready(Some(self.protocol.serve_connection(io, service)))) } else { Ok(Async::Ready(None)) @@ -585,6 +602,12 @@ impl AddrIncoming { fn set_keepalive(&mut self, dur: Option) { self.keep_alive_timeout = dur; } + + /* + fn set_sleep_on_errors(&mut self, val: bool) { + self.sleep_on_errors = val; + } + */ } impl Stream for AddrIncoming { @@ -805,9 +828,9 @@ mod hyper_service { S: Service< Request=Request, Response=Response, - Error=::Error, >, - B: Entity, + S::Error: Into>, + B: Entity, {} impl HyperService for S @@ -815,10 +838,10 @@ mod hyper_service { S: Service< Request=Request, Response=Response, - Error=::Error, >, + S::Error: Into>, S: Sealed, - B: Entity, + B: Entity, { type ResponseBody = B; type Sealed = Opaque; diff --git a/tests/client.rs b/tests/client.rs index bc5d82fe7f..355e19d3d2 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -7,14 +7,14 @@ extern crate net2; extern crate tokio; extern crate pretty_env_logger; -use std::io::{self, Read, Write}; +use std::io::{Read, Write}; use std::net::{SocketAddr, TcpListener}; use std::thread; use std::time::Duration; use hyper::{Body, Client, Method, Request, StatusCode}; -use futures::{FutureExt, StreamExt}; +use futures::{Future, FutureExt, StreamExt}; use futures::channel::oneshot; use futures::executor::block_on; use tokio::reactor::Handle; @@ -138,7 +138,7 @@ macro_rules! test { let _ = pretty_env_logger::try_init(); let runtime = Runtime::new().expect("runtime new"); - let err = test! { + let err: ::hyper::Error = test! { INNER; name: $name, runtime: &runtime, @@ -153,7 +153,11 @@ macro_rules! test { headers: { $($request_header_name => $request_header_val,)* }, body: $request_body, }.unwrap_err(); - if !$err(&err) { + + fn infer_closure bool>(f: F) -> F { f } + + let closure = infer_closure($err); + if !closure(&err) { panic!("expected error, unexpected variant: {:?}", err) } } @@ -224,7 +228,7 @@ macro_rules! test { let _ = tx.send(()); }).expect("thread spawn"); - let rx = rx.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); + let rx = rx.expect("thread panicked"); block_on(res.join(rx).map(|r| r.0)) }); @@ -481,10 +485,7 @@ test! { url: "http://{addr}/err", headers: {}, body: None, - error: |err| match err { - &hyper::Error::Incomplete => true, - _ => false, - }, + error: |err| err.to_string() == "message is incomplete", } test! { @@ -507,10 +508,8 @@ test! { url: "http://{addr}/err", headers: {}, body: None, - error: |err| match err { - &hyper::Error::Version => true, - _ => false, - }, + // should get a Parse(Version) error + error: |err| err.is_parse(), } @@ -570,10 +569,7 @@ test! { url: "http://{addr}/upgrade", headers: {}, body: None, - error: |err| match err { - &hyper::Error::Upgrade => true, - _ => false, - }, + error: |err| err.to_string() == "unsupported protocol upgrade", } @@ -595,10 +591,7 @@ test! { url: "http://{addr}/", headers: {}, body: None, - error: |err| match err { - &hyper::Error::Method => true, - _ => false, - }, + error: |err| err.is_user(), } @@ -688,9 +681,10 @@ mod dispatch_impl { let res = client.request(req).with_executor(runtime.executor()).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); Delay::new(Duration::from_secs(1)) - .err_into() + .expect("timeout") }); - let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); + let rx = rx1.expect("thread panicked"); + //let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); block_on(res.join(rx).map(|r| r.0)).unwrap(); block_on(closes.next()).unwrap().0.expect("closes"); @@ -735,11 +729,12 @@ mod dispatch_impl { res.into_body().into_stream().concat() }).and_then(|_| { Delay::new(Duration::from_secs(1)) - .err_into() + .expect("timeout") }) }; // client is dropped - let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); + let rx = rx1.expect("thread panicked"); + //let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); block_on(res.join(rx).map(|r| r.0)).unwrap(); block_on(closes.next()).unwrap().0.expect("closes"); @@ -787,7 +782,8 @@ mod dispatch_impl { assert_eq!(res.status(), hyper::StatusCode::OK); res.into_body().into_stream().concat() }); - let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); + //let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); + let rx = rx1.expect("thread panicked"); block_on(res.join(rx).map(|r| r.0)).unwrap(); // not closed yet, just idle @@ -903,7 +899,7 @@ mod dispatch_impl { client.request(req).with_executor(runtime.executor()) }; - let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); + let rx = rx1.expect("thread panicked"); block_on(res.join(rx).map(|r| r.0)).unwrap(); let t = Delay::new(Duration::from_millis(100)) @@ -954,7 +950,7 @@ mod dispatch_impl { assert_eq!(res.status(), hyper::StatusCode::OK); res.into_body().into_stream().concat() }); - let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); + let rx = rx1.expect("thread panicked"); block_on(res.join(rx).map(|r| r.0)).unwrap(); let t = Delay::new(Duration::from_millis(100)) @@ -1002,10 +998,9 @@ mod dispatch_impl { assert_eq!(res.status(), hyper::StatusCode::OK); res.into_body().into_stream().concat() }); - let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); + let rx = rx1.expect("thread panicked"); block_on(res.join(rx).map(|r| r.0)).unwrap(); - let t = Delay::new(Duration::from_millis(100)) .map(|_| panic!("time out")); let close = closes.next() @@ -1024,7 +1019,6 @@ mod dispatch_impl { // // See https://github.com/hyperium/hyper/issues/1429 - use std::error::Error; use tokio::prelude::Future; let _ = pretty_env_logger::try_init(); @@ -1065,8 +1059,8 @@ mod dispatch_impl { // shut down runtime runtime.shutdown_now().wait().unwrap(); let timeout = Delay::new(Duration::from_millis(200)); - let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); - let rx = rx.and_then(move |_| timeout.map_err(|e| e.into())); + let rx = rx1.expect("thread panicked"); + let rx = rx.and_then(move |_| timeout.expect("timeout")); let req = Request::builder() .uri(uri) @@ -1076,7 +1070,7 @@ mod dispatch_impl { // this does trigger an 'event loop gone' error, but before, it would // panic internally on a `SendError`, which is what we're testing against. let err = block_on(res.join(rx).map(|r| r.0)).unwrap_err(); - assert_eq!(err.description(), "event loop gone"); + assert_eq!(err.to_string(), "an error occurred trying to connect: event loop gone"); } #[test] @@ -1111,13 +1105,12 @@ mod dispatch_impl { assert_eq!(res.status(), hyper::StatusCode::OK); res.into_body().into_stream().concat() }); - let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); + let rx = rx1.expect("thread panicked"); let timeout = Delay::new(Duration::from_millis(200)); - let rx = rx.and_then(move |_| timeout.map_err(|e| e.into())); + let rx = rx.and_then(move |_| timeout.expect("timeout")); block_on(res.join(rx).map(|r| r.0)).unwrap(); - let t = Delay::new(Duration::from_millis(100)) .map(|_| panic!("time out")); let close = closes.next() @@ -1191,7 +1184,7 @@ mod dispatch_impl { assert_eq!(connects.load(Ordering::SeqCst), 0); - let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); + let rx = rx1.expect("thread panicked"); let req = Request::builder() .uri(&*format!("http://{}/a", addr)) .body(Body::empty()) @@ -1205,7 +1198,7 @@ mod dispatch_impl { // state and back into client pool thread::sleep(Duration::from_millis(50)); - let rx = rx2.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); + let rx = rx2.expect("thread panicked"); let req = Request::builder() .uri(&*format!("http://{}/b", addr)) .body(Body::empty()) @@ -1256,7 +1249,7 @@ mod dispatch_impl { assert_eq!(connects.load(Ordering::Relaxed), 0); - let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); + let rx = rx1.expect("thread panicked"); let req = Request::builder() .method("HEAD") .uri(&*format!("http://{}/a", addr)) @@ -1267,7 +1260,7 @@ mod dispatch_impl { assert_eq!(connects.load(Ordering::Relaxed), 1); - let rx = rx2.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); + let rx = rx2.expect("thread panicked"); let req = Request::builder() .uri(&*format!("http://{}/b", addr)) .body(Body::empty()) @@ -1308,7 +1301,7 @@ mod dispatch_impl { }); - let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); + let rx = rx1.expect("thread panicked"); let req = Request::builder() .uri(&*format!("http://{}/foo/bar", addr)) .body(Body::empty()) @@ -1410,7 +1403,7 @@ mod conn { use hyper::{self, Request}; use hyper::client::conn; - use super::{s, tcp_connect}; + use super::{s, tcp_connect, FutureHyperExt}; #[test] fn get() { @@ -1451,10 +1444,10 @@ mod conn { assert_eq!(res.status(), hyper::StatusCode::OK); res.into_body().into_stream().concat() }); - let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); + let rx = rx1.expect("thread panicked"); let timeout = Delay::new(Duration::from_millis(200)); - let rx = rx.and_then(move |_| timeout.map_err(|e| e.into())); + let rx = rx.and_then(move |_| timeout.expect("timeout")); block_on(res.join(rx).map(|r| r.0)).unwrap(); } @@ -1497,10 +1490,10 @@ mod conn { assert_eq!(res.status(), hyper::StatusCode::OK); res.into_body().into_stream().concat() }); - let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); + let rx = rx1.expect("thread panicked"); let timeout = Delay::new(Duration::from_millis(200)); - let rx = rx.and_then(move |_| timeout.map_err(|e| e.into())); + let rx = rx.and_then(move |_| timeout.expect("timeout")); block_on(res.join(rx).map(|r| r.0)).unwrap(); } @@ -1546,17 +1539,14 @@ mod conn { let res2 = client.send_request(req) .then(|result| { let err = result.expect_err("res2"); - match err { - hyper::Error::Cancel(..) => (), - other => panic!("expected Cancel, found {:?}", other), - } + assert!(err.is_canceled(), "err not canceled, {:?}", err); Ok(()) }); - let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); + let rx = rx1.expect("thread panicked"); let timeout = Delay::new(Duration::from_millis(200)); - let rx = rx.and_then(move |_| timeout.map_err(|e| e.into())); + let rx = rx.and_then(move |_| timeout.expect("timeout")); block_on(res1.join(res2).join(rx).map(|r| r.0)).unwrap(); } @@ -1614,10 +1604,10 @@ mod conn { res.into_body().into_stream().concat() }); - let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); + let rx = rx1.expect("thread panicked"); let timeout = Delay::new(Duration::from_millis(200)); - let rx = rx.and_then(move |_| timeout.map_err(|e| e.into())); + let rx = rx.and_then(move |_| timeout.expect("timeout")); block_on(until_upgrade.join(res).join(rx).map(|r| r.0)).unwrap(); // should not be ready now @@ -1697,10 +1687,10 @@ mod conn { assert_eq!(body.as_ref(), b""); }); - let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); + let rx = rx1.expect("thread panicked"); let timeout = Delay::new(Duration::from_millis(200)); - let rx = rx.and_then(move |_| timeout.map_err(|e| e.into())); + let rx = rx.and_then(move |_| timeout.expect("timeout")); block_on(until_tunneled.join(res).join(rx).map(|r| r.0)).unwrap(); // should not be ready now @@ -1749,3 +1739,17 @@ mod conn { } } } + +trait FutureHyperExt: Future { + fn expect(self, msg: &'static str) -> Box>; +} + +impl FutureHyperExt for F +where + F: Future + 'static, + F::Error: ::std::fmt::Display, +{ + fn expect(self, msg: &'static str) -> Box> { + Box::new(self.map_err(move |e| panic!("expect: {}; error={}", msg, e))) + } +} diff --git a/tests/server.rs b/tests/server.rs index 9ac7ca3460..6cd603db10 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -936,7 +936,7 @@ fn returning_1xx_response_is_error() { let socket = item.unwrap(); Http::::new() .serve_connection(socket, service_fn(|_| { - Ok(Response::builder() + Ok::<_, hyper::Error>(Response::builder() .status(StatusCode::CONTINUE) .body(Body::empty()) .unwrap()) @@ -989,7 +989,7 @@ fn upgrades() { .header("upgrade", "foobar") .body(hyper::Body::empty()) .unwrap(); - Ok(res) + Ok::<_, hyper::Error>(res) })); let mut conn_opt = Some(conn); @@ -1145,10 +1145,10 @@ fn streaming_body() { .keep_alive(false) .serve_connection(socket, service_fn(|_| { static S: &'static [&'static [u8]] = &[&[b'x'; 1_000] as &[u8]; 1_00] as _; - let b = ::futures::stream::iter_ok(S.into_iter()) + let b = ::futures::stream::iter_ok::<_, String>(S.into_iter()) .map(|&s| s); let b = hyper::Body::wrap_stream(b); - Ok(Response::new(b)) + Ok::<_, hyper::Error>(Response::new(b)) })) .map(|_| ()) });