diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index 1d2b87eb00..e3f9a4b330 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -86,7 +86,7 @@ impl Sender { } let (tx, rx) = oneshot::channel(); self.inner - .send(Envelope(Some((val, Callback::Retry(tx))))) + .send(Envelope(Some((val, Callback::Retry(Some(tx)))))) .map(move |_| rx) .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0) } @@ -97,7 +97,7 @@ impl Sender { } let (tx, rx) = oneshot::channel(); self.inner - .send(Envelope(Some((val, Callback::NoRetry(tx))))) + .send(Envelope(Some((val, Callback::NoRetry(Some(tx)))))) .map(move |_| rx) .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0) } @@ -124,7 +124,7 @@ impl UnboundedSender { pub(crate) fn try_send(&mut self, val: T) -> Result, T> { let (tx, rx) = oneshot::channel(); self.inner - .send(Envelope(Some((val, Callback::Retry(tx))))) + .send(Envelope(Some((val, Callback::Retry(Some(tx)))))) .map(move |_| rx) .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0) } @@ -198,33 +198,55 @@ impl Drop for Envelope { } pub(crate) enum Callback { - Retry(oneshot::Sender)>>), - NoRetry(oneshot::Sender>), + Retry(Option)>>>), + NoRetry(Option>>), +} + +impl Drop for Callback { + fn drop(&mut self) { + // FIXME(nox): What errors do we want here? + let error = crate::Error::new_canceled().with(if std::thread::panicking() { + "dispatch task panicked" + } else { + "dropped runtime" + }); + + match self { + Callback::Retry(tx) => if let Some(tx) = tx.take() { + let _ = tx.send(Err((error, None))); + }, + Callback::NoRetry(tx) => if let Some(tx) = tx.take() { + let _ = tx.send(Err(error)); + }, + } + } } impl Callback { #[cfg(feature = "http2")] pub(crate) fn is_canceled(&self) -> bool { match *self { - Callback::Retry(ref tx) => tx.is_closed(), - Callback::NoRetry(ref tx) => tx.is_closed(), + Callback::Retry(Some(ref tx)) => tx.is_closed(), + Callback::NoRetry(Some(ref tx)) => tx.is_closed(), + _ => unreachable!(), } } pub(crate) fn poll_canceled(&mut self, cx: &mut task::Context<'_>) -> Poll<()> { match *self { - Callback::Retry(ref mut tx) => tx.poll_closed(cx), - Callback::NoRetry(ref mut tx) => tx.poll_closed(cx), + Callback::Retry(Some(ref mut tx)) => tx.poll_closed(cx), + Callback::NoRetry(Some(ref mut tx)) => tx.poll_closed(cx), + _ => unreachable!(), } } - pub(crate) fn send(self, val: Result)>) { + pub(crate) fn send(mut self, val: Result)>) { match self { - Callback::Retry(tx) => { - let _ = tx.send(val); + Callback::Retry(ref mut tx) => { + let _ = tx.take().unwrap().send(val); } - Callback::NoRetry(tx) => { - let _ = tx.send(val.map_err(|e| e.0)); + Callback::NoRetry(ref mut tx) => { + let _ = tx.take().unwrap().send(val.map_err(|e| e.0)); } } }