Skip to content

Commit

Permalink
fix(client): send an error back to client when dispatch misbehaves (f…
Browse files Browse the repository at this point in the history
…ixes #2649)
  • Loading branch information
nox committed Mar 21, 2022
1 parent 57a1d02 commit e895ad1
Showing 1 changed file with 36 additions and 14 deletions.
50 changes: 36 additions & 14 deletions src/client/dispatch.rs
Expand Up @@ -86,7 +86,7 @@ impl<T, U> Sender<T, U> {
}
let (tx, rx) = oneshot::channel();
self.inner
.send(Envelope(Some((val, Callback::Retry(tx)))))
.send(Envelope(Some((val, Callback::Retry(Some(tx))))))
.map(move |_| rx)
.map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
}
Expand All @@ -97,7 +97,7 @@ impl<T, U> Sender<T, U> {
}
let (tx, rx) = oneshot::channel();
self.inner
.send(Envelope(Some((val, Callback::NoRetry(tx)))))
.send(Envelope(Some((val, Callback::NoRetry(Some(tx))))))
.map(move |_| rx)
.map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
}
Expand All @@ -124,7 +124,7 @@ impl<T, U> UnboundedSender<T, U> {
pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> {
let (tx, rx) = oneshot::channel();
self.inner
.send(Envelope(Some((val, Callback::Retry(tx)))))
.send(Envelope(Some((val, Callback::Retry(Some(tx))))))
.map(move |_| rx)
.map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
}
Expand Down Expand Up @@ -198,33 +198,55 @@ impl<T, U> Drop for Envelope<T, U> {
}

pub(crate) enum Callback<T, U> {
Retry(oneshot::Sender<Result<U, (crate::Error, Option<T>)>>),
NoRetry(oneshot::Sender<Result<U, crate::Error>>),
Retry(Option<oneshot::Sender<Result<U, (crate::Error, Option<T>)>>>),
NoRetry(Option<oneshot::Sender<Result<U, crate::Error>>>),
}

impl<T, U> Drop for Callback<T, U> {
fn drop(&mut self) {
// FIXME(nox): What errors do we want here?
let error = crate::Error::new_canceled().with(if std::thread::panicking() {
"dispatch task panicked"
} else {
"dropped runtime"
});

match self {
Callback::Retry(tx) => if let Some(tx) = tx.take() {
let _ = tx.send(Err((error, None)));
},
Callback::NoRetry(tx) => if let Some(tx) = tx.take() {
let _ = tx.send(Err(error));
},
}
}
}

impl<T, U> Callback<T, U> {
#[cfg(feature = "http2")]
pub(crate) fn is_canceled(&self) -> bool {
match *self {
Callback::Retry(ref tx) => tx.is_closed(),
Callback::NoRetry(ref tx) => tx.is_closed(),
Callback::Retry(Some(ref tx)) => tx.is_closed(),
Callback::NoRetry(Some(ref tx)) => tx.is_closed(),
_ => unreachable!(),
}
}

pub(crate) fn poll_canceled(&mut self, cx: &mut task::Context<'_>) -> Poll<()> {
match *self {
Callback::Retry(ref mut tx) => tx.poll_closed(cx),
Callback::NoRetry(ref mut tx) => tx.poll_closed(cx),
Callback::Retry(Some(ref mut tx)) => tx.poll_closed(cx),
Callback::NoRetry(Some(ref mut tx)) => tx.poll_closed(cx),
_ => unreachable!(),
}
}

pub(crate) fn send(self, val: Result<U, (crate::Error, Option<T>)>) {
pub(crate) fn send(mut self, val: Result<U, (crate::Error, Option<T>)>) {
match self {
Callback::Retry(tx) => {
let _ = tx.send(val);
Callback::Retry(ref mut tx) => {
let _ = tx.take().unwrap().send(val);
}
Callback::NoRetry(tx) => {
let _ = tx.send(val.map_err(|e| e.0));
Callback::NoRetry(ref mut tx) => {
let _ = tx.take().unwrap().send(val.map_err(|e| e.0));
}
}
}
Expand Down

0 comments on commit e895ad1

Please sign in to comment.