Skip to content

Commit

Permalink
feat(error): revamp hyper::Error type
Browse files Browse the repository at this point in the history
**The `Error` is now an opaque struct**, which allows for more variants to
be added freely, and the internal representation to change without being
breaking changes.

For inspecting an `Error`, there are several `is_*` methods to check for
certain classes of errors, such as `Error::is_parse()`. The `cause` can
also be inspected, like before. This likely seems like a downgrade, but
more inspection can be added as needed!

The `Error` now knows about more states, which gives much more context
around when a certain error occurs. This is also expressed in the
description and `fmt` messages.

**Most places where a user would provide an error to hyper can now pass
any error type** (`E: Into<Box<std::error::Error>>`). This error is passed
back in relevant places, and can be useful for logging. This should make
it much clearer about what error a user should provide to hyper: any it
feels is relevant!

Closes #1128
Closes #1130
Closes #1431
Closes #1338

BREAKING CHANGE: `Error` is no longer an enum to pattern match over, or
  to construct. Code will need to be updated accordingly.

  For body streams or `Service`s, inference might be unable to determine
  what error type you mean to return. Starting in Rust 1.26, you could
  just label that as `!` if you never return an error.
  • Loading branch information
seanmonstar committed Apr 6, 2018
1 parent 7c12a2c commit 16b04fa
Show file tree
Hide file tree
Showing 21 changed files with 536 additions and 426 deletions.
5 changes: 3 additions & 2 deletions examples/client.rs
Expand Up @@ -40,8 +40,9 @@ fn main() {
println!("Response: {}", res.status());
println!("Headers: {:#?}", res.headers());

res.into_parts().1.into_stream().for_each(|chunk| {
io::stdout().write_all(&chunk).map_err(From::from)
res.into_body().into_stream().for_each(|chunk| {
io::stdout().write_all(&chunk)
.map_err(|e| panic!("example expects stdout is open, error={}", e))
})
}).then(|result| {
if let Some(err) = result.err() {
Expand Down
3 changes: 2 additions & 1 deletion examples/hello.rs
Expand Up @@ -17,7 +17,8 @@ fn main() {
let addr = ([127, 0, 0, 1], 3000).into();

let new_service = const_service(service_fn(|_| {
Ok(Response::new(Body::from(PHRASE)))
//TODO: when `!` is stable, replace error type
Ok::<_, hyper::Error>(Response::new(Body::from(PHRASE)))
}));

tokio::runtime::run2(lazy(move |_| {
Expand Down
9 changes: 4 additions & 5 deletions examples/send_file.rs
Expand Up @@ -9,7 +9,6 @@ use futures::future::lazy;
use futures::channel::oneshot;

use hyper::{Body, /*Chunk,*/ Method, Request, Response, StatusCode};
use hyper::error::Error;
use hyper::server::{Http, Service};

use std::fs::File;
Expand All @@ -19,7 +18,7 @@ use std::thread;
static NOTFOUND: &[u8] = b"Not Found";
static INDEX: &str = "examples/send_file_index.html";

fn simple_file_send(f: &str) -> Box<Future<Item = Response<Body>, Error = hyper::Error> + Send> {
fn simple_file_send(f: &str) -> Box<Future<Item = Response<Body>, Error = io::Error> + Send> {
// Serve a file by reading it entirely into memory. As a result
// this is limited to serving small files, but it is somewhat
// simpler with a little less overhead.
Expand Down Expand Up @@ -56,15 +55,15 @@ fn simple_file_send(f: &str) -> Box<Future<Item = Response<Body>, Error = hyper:
};
});

Box::new(rx.map_err(|e| Error::from(io::Error::new(io::ErrorKind::Other, e))))
Box::new(rx.map_err(|e| io::Error::new(io::ErrorKind::Other, e)))
}

struct ResponseExamples;

impl Service for ResponseExamples {
type Request = Request<Body>;
type Response = Response<Body>;
type Error = hyper::Error;
type Error = io::Error;
type Future = Box<Future<Item = Self::Response, Error = Self::Error> + Send>;

fn call(&self, req: Request<Body>) -> Self::Future {
Expand Down Expand Up @@ -119,7 +118,7 @@ impl Service for ResponseExamples {
*/
});

Box::new(rx.map_err(|e| Error::from(io::Error::new(io::ErrorKind::Other, e))))
Box::new(rx.map_err(|e| io::Error::new(io::ErrorKind::Other, e)))
},
(&Method::GET, "/no_file.html") => {
// Test what happens when file cannot be be found
Expand Down
24 changes: 12 additions & 12 deletions src/client/conn.rs
Expand Up @@ -46,7 +46,7 @@ pub struct SendRequest<B> {
pub struct Connection<T, B>
where
T: AsyncRead + AsyncWrite,
B: Entity<Error=::Error> + 'static,
B: Entity + 'static,
{
inner: proto::dispatch::Dispatcher<
proto::dispatch::Client<B>,
Expand Down Expand Up @@ -139,7 +139,7 @@ impl<B> SendRequest<B>

impl<B> SendRequest<B>
where
B: Entity<Error=::Error> + 'static,
B: Entity + 'static,
{
/// Sends a `Request` on the associated connection.
///
Expand Down Expand Up @@ -263,7 +263,7 @@ impl<B> fmt::Debug for SendRequest<B> {
impl<T, B> Connection<T, B>
where
T: AsyncRead + AsyncWrite,
B: Entity<Error=::Error> + 'static,
B: Entity + 'static,
{
/// Return the inner IO object, and additional information.
pub fn into_parts(self) -> Parts<T> {
Expand All @@ -290,7 +290,7 @@ where
impl<T, B> Future for Connection<T, B>
where
T: AsyncRead + AsyncWrite,
B: Entity<Error=::Error> + 'static,
B: Entity + 'static,
{
type Item = ();
type Error = ::Error;
Expand All @@ -303,7 +303,7 @@ where
impl<T, B> fmt::Debug for Connection<T, B>
where
T: AsyncRead + AsyncWrite + fmt::Debug,
B: Entity<Error=::Error> + 'static,
B: Entity + 'static,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Connection")
Expand Down Expand Up @@ -332,7 +332,7 @@ impl Builder {
pub fn handshake<T, B>(&self, io: T) -> Handshake<T, B>
where
T: AsyncRead + AsyncWrite,
B: Entity<Error=::Error> + 'static,
B: Entity + 'static,
{
Handshake {
inner: HandshakeInner {
Expand All @@ -346,7 +346,7 @@ impl Builder {
pub(super) fn handshake_no_upgrades<T, B>(&self, io: T) -> HandshakeNoUpgrades<T, B>
where
T: AsyncRead + AsyncWrite,
B: Entity<Error=::Error> + 'static,
B: Entity + 'static,
{
HandshakeNoUpgrades {
inner: HandshakeInner {
Expand All @@ -363,7 +363,7 @@ impl Builder {
impl<T, B> Future for Handshake<T, B>
where
T: AsyncRead + AsyncWrite,
B: Entity<Error=::Error> + 'static,
B: Entity + 'static,
{
type Item = (SendRequest<B>, Connection<T, B>);
type Error = ::Error;
Expand All @@ -388,7 +388,7 @@ impl<T, B> fmt::Debug for Handshake<T, B> {
impl<T, B> Future for HandshakeNoUpgrades<T, B>
where
T: AsyncRead + AsyncWrite,
B: Entity<Error=::Error> + 'static,
B: Entity + 'static,
{
type Item = (SendRequest<B>, proto::dispatch::Dispatcher<
proto::dispatch::Client<B>,
Expand All @@ -407,7 +407,7 @@ where
impl<T, B, R> Future for HandshakeInner<T, B, R>
where
T: AsyncRead + AsyncWrite,
B: Entity<Error=::Error> + 'static,
B: Entity + 'static,
R: proto::Http1Transaction<
Incoming=StatusCode,
Outgoing=proto::RequestLine,
Expand Down Expand Up @@ -471,15 +471,15 @@ impl<B: Send> AssertSendSync for SendRequest<B> {}
impl<T: Send, B: Send> AssertSend for Connection<T, B>
where
T: AsyncRead + AsyncWrite,
B: Entity<Error=::Error> + 'static,
B: Entity + 'static,
B::Data: Send + 'static,
{}

#[doc(hidden)]
impl<T: Send + Sync, B: Send + Sync> AssertSendSync for Connection<T, B>
where
T: AsyncRead + AsyncWrite,
B: Entity<Error=::Error> + 'static,
B: Entity + 'static,
B::Data: Send + Sync + 'static,
{}

Expand Down
2 changes: 1 addition & 1 deletion src/client/connect.rs
Expand Up @@ -33,7 +33,7 @@ pub trait Connect: Send + Sync {
/// The connected IO Stream.
type Transport: AsyncRead + AsyncWrite + Send + 'static;
/// An error occured when trying to connect.
type Error;
type Error: Into<Box<StdError + Send + Sync>>;
/// A Future that will resolve to the connected Transport.
type Future: Future<Item=(Self::Transport, Connected), Error=Self::Error> + Send;
/// Connect to a destination.
Expand Down
13 changes: 8 additions & 5 deletions src/client/dispatch.rs
Expand Up @@ -38,10 +38,10 @@ impl<T, U> Sender<T, U> {
// there's room in the queue, but does the Connection
// want a message yet?
self.giver.poll_want(cx)
.map_err(|_| ::Error::Closed)
.map_err(|_| ::Error::new_closed())
},
Ok(Async::Pending) => Ok(Async::Pending),
Err(_) => Err(::Error::Closed),
Err(_) => Err(::Error::new_closed()),
}
}

Expand Down Expand Up @@ -183,9 +183,12 @@ mod tests {
drop(rx);

promise.then(|fulfilled| {
let res = fulfilled.expect("fulfilled");
match res.unwrap_err() {
(::Error::Cancel(_), Some(_)) => (),
let err = fulfilled
.expect("fulfilled")
.expect_err("promise should error");

match (err.0.kind(), err.1) {
(&::error::Kind::Canceled, Some(_)) => (),
e => panic!("expected Error::Cancel(_), found {:?}", e),
}

Expand Down
51 changes: 19 additions & 32 deletions src/client/mod.rs
Expand Up @@ -6,7 +6,7 @@ use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;

use futures::{Async, Future, FutureExt, Never, Poll};
use futures::{Async, Future, FutureExt, Poll};
use futures::channel::oneshot;
use futures::future;
use futures::task;
Expand Down Expand Up @@ -93,10 +93,10 @@ impl<C, B> Client<C, B> {
}

impl<C, B> Client<C, B>
where C: Connect<Error=io::Error> + Sync + 'static,
where C: Connect + Sync + 'static,
C::Transport: 'static,
C::Future: 'static,
B: Entity<Error=::Error> + Send + 'static,
B: Entity + Send + 'static,
B::Data: Send,
{

Expand Down Expand Up @@ -128,13 +128,14 @@ where C: Connect<Error=io::Error> + Sync + 'static,
Version::HTTP_11 => (),
other => {
error!("Request has unsupported version \"{:?}\"", other);
return FutureResponse(Box::new(future::err(::Error::Version)));
//TODO: replace this with a proper variant
return FutureResponse(Box::new(future::err(::Error::new_user_unsupported_version())));
}
}

if req.method() == &Method::CONNECT {
debug!("Client does not support CONNECT requests");
return FutureResponse(Box::new(future::err(::Error::Method)));
return FutureResponse(Box::new(future::err(::Error::new_user_unsupported_request_method())));
}

let uri = req.uri().clone();
Expand All @@ -143,7 +144,8 @@ where C: Connect<Error=io::Error> + Sync + 'static,
format!("{}://{}", scheme, auth)
}
_ => {
return FutureResponse(Box::new(future::err(::Error::Io(
//TODO: replace this with a proper variant
return FutureResponse(Box::new(future::err(::Error::new_io(
io::Error::new(
io::ErrorKind::InvalidInput,
"invalid URI for Client Request"
Expand Down Expand Up @@ -190,16 +192,16 @@ where C: Connect<Error=io::Error> + Sync + 'static,
};
future::lazy(move |_| {
connector.connect(dst)
.err_into()
.map_err(::Error::new_connect)
.and_then(move |(io, connected)| {
conn::Builder::new()
.h1_writev(h1_writev)
.handshake_no_upgrades(io)
.and_then(move |(tx, conn)| {
future::lazy(move |cx| {
execute(conn.recover(|e| {
cx.spawn(conn.recover(|e| {
debug!("client connection error: {}", e);
}), cx)?;
}));
Ok(pool.pooled(pool_key, PoolClient {
is_proxied: connected.is_proxied,
tx: tx,
Expand Down Expand Up @@ -251,8 +253,6 @@ where C: Connect<Error=io::Error> + Sync + 'static,
} else if !res.body().is_empty() {
let (delayed_tx, delayed_rx) = oneshot::channel();
res.body_mut().delayed_eof(delayed_rx);
// If the executor doesn't have room, oh well. Things will likely
// be blowing up soon, but this specific task isn't required.
let fut = future::poll_fn(move |cx| {
pooled.tx.poll_ready(cx)
})
Expand All @@ -262,7 +262,7 @@ where C: Connect<Error=io::Error> + Sync + 'static,
drop(delayed_tx);
Ok(())
});
execute(fut, cx).ok();
cx.spawn(fut);
}
Ok(res)
})
Expand All @@ -277,9 +277,9 @@ where C: Connect<Error=io::Error> + Sync + 'static,
}

impl<C, B> Service for Client<C, B>
where C: Connect<Error=io::Error> + 'static,
where C: Connect + 'static,
C::Future: 'static,
B: Entity<Error=::Error> + Send + 'static,
B: Entity + Send + 'static,
B::Data: Send,
{
type Request = Request<B>;
Expand Down Expand Up @@ -339,9 +339,9 @@ struct RetryableSendRequest<C, B> {

impl<C, B> Future for RetryableSendRequest<C, B>
where
C: Connect<Error=io::Error> + 'static,
C: Connect + 'static,
C::Future: 'static,
B: Entity<Error=::Error> + Send + 'static,
B: Entity + Send + 'static,
B::Data: Send,
{
type Item = Response<Body>;
Expand Down Expand Up @@ -550,10 +550,10 @@ impl<C, B> Config<C, B> {
}

impl<C, B> Config<C, B>
where C: Connect<Error=io::Error>,
where C: Connect,
C::Transport: 'static,
C::Future: 'static,
B: Entity<Error=::Error> + Send,
B: Entity + Send,
B::Data: Send,
{
/// Construct the Client with this configuration.
Expand All @@ -565,7 +565,7 @@ where C: Connect<Error=io::Error>,

impl<B> Config<UseDefaultConnector, B>
where
B: Entity<Error=::Error> + Send,
B: Entity + Send,
B::Data: Send,
{
/// Construct the Client with this configuration.
Expand Down Expand Up @@ -600,16 +600,3 @@ impl<C: Clone, B> Clone for Config<C, B> {
}
}


fn execute<F>(fut: F, cx: &mut task::Context) -> Result<(), ::Error>
where F: Future<Item=(), Error=Never> + Send + 'static,
{
if let Some(executor) = cx.executor() {
executor.spawn(Box::new(fut)).map_err(|err| {
debug!("executor error: {:?}", err);
::Error::Executor
})
} else {
Err(::Error::Executor)
}
}

0 comments on commit 16b04fa

Please sign in to comment.