Skip to content

Commit

Permalink
Revert returning Error when using blocking API inside an Executor
Browse files Browse the repository at this point in the history
Log a warning instead.

cc #541
  • Loading branch information
seanmonstar committed Oct 9, 2019
1 parent 98ffbd7 commit 5da0ee6
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 29 deletions.
3 changes: 0 additions & 3 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,9 +655,6 @@ impl ClientHandle {
let res = match wait::timeout(fut, self.timeout.0) {
Ok(res) => res,
Err(wait::Waited::TimedOut) => return Err(::error::timedout(Some(url))),
Err(wait::Waited::Executor(err)) => {
return Err(::error::from(err).with_url(url))
},
Err(wait::Waited::Inner(err)) => {
return Err(err.with_url(url));
},
Expand Down
23 changes: 3 additions & 20 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ use std::error::Error as StdError;
use std::fmt;
use std::io;

use tokio_executor::EnterError;

use {StatusCode, Url};

/// The Errors that may occur when processing a `Request`.
Expand Down Expand Up @@ -153,8 +151,7 @@ impl Error {
Kind::RedirectLoop |
Kind::Status(_) |
Kind::UnknownProxyScheme |
Kind::Timer |
Kind::BlockingClientInFutureContext => None,
Kind::Timer => None,
}
}

Expand Down Expand Up @@ -248,8 +245,6 @@ impl fmt::Debug for Error {
}
}

static BLOCK_IN_FUTURE: &'static str = "blocking Client used inside a Future context";

impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if let Some(ref url) = self.inner.url {
Expand Down Expand Up @@ -289,7 +284,6 @@ impl fmt::Display for Error {
}
Kind::UnknownProxyScheme => f.write_str("Unknown proxy scheme"),
Kind::Timer => f.write_str("timer unavailable"),
Kind::BlockingClientInFutureContext => f.write_str(BLOCK_IN_FUTURE),
}
}
}
Expand Down Expand Up @@ -326,7 +320,6 @@ impl StdError for Error {
}
Kind::UnknownProxyScheme => "Unknown proxy scheme",
Kind::Timer => "timer unavailable",
Kind::BlockingClientInFutureContext => BLOCK_IN_FUTURE,
}
}

Expand Down Expand Up @@ -354,8 +347,7 @@ impl StdError for Error {
Kind::RedirectLoop |
Kind::Status(_) |
Kind::UnknownProxyScheme |
Kind::Timer |
Kind::BlockingClientInFutureContext => None,
Kind::Timer => None,
}
}

Expand All @@ -381,8 +373,7 @@ impl StdError for Error {
Kind::RedirectLoop |
Kind::Status(_) |
Kind::UnknownProxyScheme |
Kind::Timer |
Kind::BlockingClientInFutureContext => None,
Kind::Timer => None,
}
}
}
Expand Down Expand Up @@ -410,7 +401,6 @@ pub(crate) enum Kind {
Status(StatusCode),
UnknownProxyScheme,
Timer,
BlockingClientInFutureContext,
}


Expand Down Expand Up @@ -482,18 +472,11 @@ where T: Into<Kind> {
fn from(err: ::wait::Waited<T>) -> Kind {
match err {
::wait::Waited::TimedOut => io_timeout().into(),
::wait::Waited::Executor(e) => e.into(),
::wait::Waited::Inner(e) => e.into(),
}
}
}

impl From<EnterError> for Kind {
fn from(_err: EnterError) -> Kind {
Kind::BlockingClientInFutureContext
}
}

impl From<::tokio::timer::Error> for Kind {
fn from(_err: ::tokio::timer::Error) -> Kind {
Kind::Timer
Expand Down
3 changes: 0 additions & 3 deletions src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ impl Response {
wait::timeout(self.inner.json(), self.timeout).map_err(|e| {
match e {
wait::Waited::TimedOut => ::error::timedout(None),
wait::Waited::Executor(e) => ::error::from(e),
wait::Waited::Inner(e) => e,
}
})
Expand Down Expand Up @@ -263,7 +262,6 @@ impl Response {
wait::timeout(self.inner.text_with_charset(default_encoding), self.timeout).map_err(|e| {
match e {
wait::Waited::TimedOut => ::error::timedout(None),
wait::Waited::Executor(e) => ::error::from(e),
wait::Waited::Inner(e) => e,
}
})
Expand Down Expand Up @@ -378,7 +376,6 @@ impl Stream for WaitBody {
Some(Err(e)) => {
let req_err = match e {
wait::Waited::TimedOut => ::error::timedout(None),
wait::Waited::Executor(e) => ::error::from(e),
wait::Waited::Inner(e) => e,
};

Expand Down
32 changes: 29 additions & 3 deletions src/wait.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::time::{Duration, Instant};

use futures::{Async, Future, Poll, Stream};
use futures::executor::{self, Notify};
use tokio_executor::{enter, EnterError};
use tokio_executor;

pub(crate) fn timeout<F>(fut: F, timeout: Option<Duration>) -> Result<F::Item, Waited<F::Error>>
where
Expand All @@ -27,7 +28,6 @@ where S: Stream {
#[derive(Debug)]
pub(crate) enum Waited<E> {
TimedOut,
Executor(EnterError),
Inner(E),
}

Expand Down Expand Up @@ -69,11 +69,37 @@ impl Notify for ThreadNotify {
}
}

static ENTER_WARNED: AtomicBool = AtomicBool::new(false);

fn block_on<F, U, E>(timeout: Option<Duration>, mut poll: F) -> Result<U, Waited<E>>
where
F: FnMut(&Arc<ThreadNotify>) -> Poll<U, E>,
{
let _entered = enter().map_err(Waited::Executor)?;
// Check we're not already inside an `Executor`,
// but for now, only log a warning if so.
let _entered = tokio_executor::enter().map_err(|_| {
if ENTER_WARNED.swap(true, Ordering::Relaxed) {
trace!("warning: blocking API used inside an async Executor");
} else {
// If you're here wondering why you saw this message, it's because
// something used `reqwest::Client`, which is a blocking
// (synchronous) API, while within the context of an asynchronous
// executor. For example, some async server.
//
// Doing this means that while you wait for the synchronous reqwest,
// your server thread will be blocked, not being able to do
// anything else.
//
// The best way to fix it is to use the `reqwest::async::Client`
// instead, and return the futures for the async Executor to run
// cooperatively.
//
// In future versions of reqwest, this will become an Error.
//
// For more: https://github.com/seanmonstar/reqwest/issues/541
warn!("blocking API used inside an async Executor can negatively impact perfomance");
}
});
let deadline = timeout.map(|d| {
Instant::now() + d
});
Expand Down
32 changes: 32 additions & 0 deletions tests/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,3 +354,35 @@ fn body_stream() {

rt.block_on(res_future).unwrap();
}

#[test]
fn blocking_inside_async_context() {
let _ = env_logger::try_init();

let server = server! {
request: b"\
GET /enter HTTP/1.1\r\n\
user-agent: $USERAGENT\r\n\
accept: */*\r\n\
accept-encoding: gzip\r\n\
host: $HOST\r\n\
\r\n\
",
response: b"\
HTTP/1.1 200 OK\r\n\
Content-Length: 18\r\n\
\r\n\
Blocky McBlockface\
"
};

let mut rt = Runtime::new().expect("new rt");
let url = format!("http://{}/enter", server.addr());

rt.block_on(futures::future::lazy(|| {
let mut resp = reqwest::get(&url)?;
let text = resp.text()?;
assert_eq!("Blocky McBlockface", text);
Ok::<_, reqwest::Error>(())
})).unwrap();
}

0 comments on commit 5da0ee6

Please sign in to comment.