diff --git a/src/client/conn.rs b/src/client/conn.rs index c379b9a0a9..2911251b10 100644 --- a/src/client/conn.rs +++ b/src/client/conn.rs @@ -458,8 +458,7 @@ impl Builder { /// Provide an executor to execute background HTTP2 tasks. pub fn executor(&mut self, exec: E) -> &mut Builder where - for<'a> &'a E: tokio_executor::Executor, - E: Send + Sync + 'static, + E: Fn(crate::common::exec::BoxFuture) + Send + Sync + 'static, { self.exec = Exec::Executor(Arc::new(exec)); self diff --git a/src/client/mod.rs b/src/client/mod.rs index 592db82c23..b8ca7ac8fa 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -359,10 +359,7 @@ where C: Connect + Clone + Send + Sync + 'static, drop(delayed_tx); }); - if let Err(err) = executor.execute(on_idle) { - // This task isn't critical, so just log and ignore. - warn!("error spawning task to insert idle connection: {}", err); - } + executor.execute(on_idle); } else { // There's no body to delay, but the connection isn't // ready yet. Only re-insert when it's ready @@ -371,10 +368,7 @@ where C: Connect + Clone + Send + Sync + 'static, }) .map(|_| ()); - if let Err(err) = executor.execute(on_idle) { - // This task isn't critical, so just log and ignore. - warn!("error spawning task to insert idle connection: {}", err); - } + executor.execute(on_idle); } res }))) @@ -513,20 +507,13 @@ where C: Connect + Clone + Send + Sync + 'static, .handshake(io) .and_then(move |(tx, conn)| { trace!("handshake complete, spawning background dispatcher task"); - let bg = executor.execute(conn.map_err(|e| { + executor.execute(conn.map_err(|e| { debug!("client connection error: {}", e) }).map(|_| ())); - // This task is critical, so an execute error - // should be returned. - if let Err(err) = bg { - warn!("error spawning critical client task: {}", err); - return Either::Left(future::err(err)); - } - // Wait for 'conn' to ready up before we // declare this tx as usable - Either::Right(tx.when_ready()) + tx.when_ready() }) .map_ok(move |tx| { pool.pooled(connecting, PoolClient { @@ -1013,8 +1000,7 @@ impl Builder { /// Provide an executor to execute background `Connection` tasks. pub fn executor(&mut self, exec: E) -> &mut Self where - for<'a> &'a E: tokio_executor::Executor, - E: Send + Sync + 'static, + E: Fn(crate::common::exec::BoxFuture) + Send + Sync + 'static, { self.conn_builder.executor(exec); self diff --git a/src/client/pool.rs b/src/client/pool.rs index e13d2245a0..88b0434eb7 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -416,16 +416,11 @@ impl PoolInner { let start = Instant::now() + dur; - let interval = IdleTask { + self.exec.execute(IdleTask { interval: Interval::new(start, dur), pool: WeakOpt::downgrade(pool_ref), pool_drop_notifier: rx, - }; - - if let Err(err) = self.exec.execute(interval) { - // This task isn't critical, so simply log and ignore. - warn!("error spawning connection pool idle interval: {}", err); - } + }); } } diff --git a/src/client/service.rs b/src/client/service.rs index bfb77e1405..d85d3107b6 100644 --- a/src/client/service.rs +++ b/src/client/service.rs @@ -63,7 +63,7 @@ where if let Err(e) = conn.await { debug!("connection error: {:?}", e); } - })?; + }); Ok(sr) }, Err(e) => Err(e) diff --git a/src/common/exec.rs b/src/common/exec.rs index cba551a10f..0975136285 100644 --- a/src/common/exec.rs +++ b/src/common/exec.rs @@ -3,48 +3,33 @@ use std::future::Future; use std::pin::Pin; use std::sync::Arc; -use tokio_executor::{SpawnError, TypedExecutor}; - use crate::body::{Payload, Body}; use crate::proto::h2::server::H2Stream; use crate::server::conn::spawn_all::{NewSvcTask, Watcher}; use crate::service::HttpService; pub trait H2Exec: Clone { - fn execute_h2stream(&mut self, fut: H2Stream) -> crate::Result<()>; + fn execute_h2stream(&self, fut: H2Stream); } pub trait NewSvcExec, E, W: Watcher>: Clone { - fn execute_new_svc(&mut self, fut: NewSvcTask) -> crate::Result<()>; -} - -type BoxFuture = Pin + Send>>; - -pub trait SharedExecutor { - fn shared_spawn(&self, future: BoxFuture) -> Result<(), SpawnError>; + fn execute_new_svc(&self, fut: NewSvcTask); } -impl SharedExecutor for E -where - for<'a> &'a E: tokio_executor::Executor, -{ - fn shared_spawn(mut self: &Self, future: BoxFuture) -> Result<(), SpawnError> { - tokio_executor::Executor::spawn(&mut self, future) - } -} +pub type BoxFuture = Pin + Send>>; // Either the user provides an executor for background tasks, or we use // `tokio::spawn`. #[derive(Clone)] pub enum Exec { Default, - Executor(Arc), + Executor(Arc), } // ===== impl Exec ===== impl Exec { - pub(crate) fn execute(&self, fut: F) -> crate::Result<()> + pub(crate) fn execute(&self, fut: F) where F: Future + Send + 'static, { @@ -52,34 +37,7 @@ impl Exec { Exec::Default => { #[cfg(feature = "tcp")] { - use std::error::Error as StdError; - - struct TokioSpawnError; - - impl fmt::Debug for TokioSpawnError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt::Debug::fmt("tokio::spawn failed (is a tokio runtime running this future?)", f) - } - } - - impl fmt::Display for TokioSpawnError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt::Display::fmt("tokio::spawn failed (is a tokio runtime running this future?)", f) - } - } - - impl StdError for TokioSpawnError { - fn description(&self) -> &str { - "tokio::spawn failed" - } - } - - ::tokio_executor::DefaultExecutor::current() - .spawn(Box::pin(fut)) - .map_err(|err| { - warn!("executor error: {:?}", err); - crate::Error::new_execute(TokioSpawnError) - }) + tokio::spawn(fut); } #[cfg(not(feature = "tcp"))] { @@ -88,11 +46,7 @@ impl Exec { } }, Exec::Executor(ref e) => { - e.shared_spawn(Box::pin(fut)) - .map_err(|err| { - warn!("executor error: {:?}", err); - crate::Error::new_execute("custom executor failed") - }) + e(Box::pin(fut)); }, } } @@ -100,8 +54,10 @@ impl Exec { impl fmt::Debug for Exec { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Exec") - .finish() + match self { + Exec::Default => f.write_str("Exec::Default"), + Exec::Executor(..) => f.write_str("Exec::Custom"), + } } } @@ -111,7 +67,7 @@ where H2Stream: Future + Send + 'static, B: Payload, { - fn execute_h2stream(&mut self, fut: H2Stream) -> crate::Result<()> { + fn execute_h2stream(&self, fut: H2Stream) { self.execute(fut) } } @@ -122,7 +78,7 @@ where S: HttpService, W: Watcher, { - fn execute_new_svc(&mut self, fut: NewSvcTask) -> crate::Result<()> { + fn execute_new_svc(&self, fut: NewSvcTask) { self.execute(fut) } } @@ -131,34 +87,23 @@ where impl H2Exec for E where - E: TypedExecutor> + Clone, + E: Fn(H2Stream) + Clone, H2Stream: Future, B: Payload, { - fn execute_h2stream(&mut self, fut: H2Stream) -> crate::Result<()> { - self.spawn(fut) - .map_err(|err| { - warn!("executor error: {:?}", err); - crate::Error::new_execute("custom executor failed") - }) + fn execute_h2stream(&self, fut: H2Stream) { + self(fut); } } impl NewSvcExec for E where - E: TypedExecutor> + Clone, + E: Fn(NewSvcTask) + Clone, NewSvcTask: Future, S: HttpService, W: Watcher, { - fn execute_new_svc(&mut self, fut: NewSvcTask) -> crate::Result<()> { - self.spawn(fut) - .map_err(|err| { - warn!("executor error: {:?}", err); - crate::Error::new_execute("custom executor failed") - }) + fn execute_new_svc(&self, fut: NewSvcTask) { + self(fut); } } - -// ===== StdError impls ===== - diff --git a/src/error.rs b/src/error.rs index b57e6e312e..42dd173a1b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -86,12 +86,8 @@ pub(crate) enum User { /// User tried polling for an upgrade that doesn't exist. NoUpgrade, - /// User polled for an upgrade, but low-level API is not using upgrades. ManualUpgrade, - - /// Error trying to call `Executor::execute`. - Execute, } impl Error { @@ -277,10 +273,6 @@ impl Error { Error::new(Kind::Shutdown).with(cause) } - pub(crate) fn new_execute>(cause: E) -> Error { - Error::new_user(User::Execute).with(cause) - } - pub(crate) fn new_h2(cause: ::h2::Error) -> Error { if cause.is_io() { Error::new_io(cause.into_io().expect("h2::Error::is_io")) @@ -346,7 +338,6 @@ impl StdError for Error { Kind::User(User::AbsoluteUriRequired) => "client requires absolute-form URIs", Kind::User(User::NoUpgrade) => "no upgrade available", Kind::User(User::ManualUpgrade) => "upgrade expected but low level API in use", - Kind::User(User::Execute) => "executor failed to spawn task", } } diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index 6b7760f60e..cb150d34b8 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -71,7 +71,7 @@ where } }; - exec.execute(conn_task)?; + exec.execute(conn_task); Ok(ClientTask { conn_drop_ref, @@ -155,7 +155,7 @@ where drop(conn_drop_ref); x }); - self.executor.execute(pipe)?; + self.executor.execute(pipe); } } } @@ -175,7 +175,7 @@ where } } }); - self.executor.execute(cb.send_when(fut))?; + self.executor.execute(cb.send_when(fut)); continue; }, diff --git a/src/proto/h2/server.rs b/src/proto/h2/server.rs index 3e765d9e98..c32bc5d3bc 100644 --- a/src/proto/h2/server.rs +++ b/src/proto/h2/server.rs @@ -175,7 +175,7 @@ where crate::Body::h2(stream, content_length) }); let fut = H2Stream::new(service.call(req), respond); - exec.execute_h2stream(fut)?; + exec.execute_h2stream(fut); }, Some(Err(e)) => { return Poll::Ready(Err(crate::Error::new_h2(e))); diff --git a/src/server/conn.rs b/src/server/conn.rs index dcb3e2e8b4..8a1b4320df 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -843,7 +843,7 @@ where loop { if let Some(connecting) = ready!(me.serve.as_mut().poll_next_(cx)?) { let fut = NewSvcTask::new(connecting, watcher.clone()); - me.serve.as_mut().project().protocol.exec.execute_new_svc(fut)?; + me.serve.as_mut().project().protocol.exec.execute_new_svc(fut); } else { return Poll::Ready(Ok(())); }