diff --git a/tonic/src/codec/decode.rs b/tonic/src/codec/decode.rs index b4dda4729..66bc14e8a 100644 --- a/tonic/src/codec/decode.rs +++ b/tonic/src/codec/decode.rs @@ -39,6 +39,7 @@ impl Unpin for Streaming {} enum State { ReadHeader, ReadBody { compression: bool, len: usize }, + Error, } #[derive(Debug)] @@ -311,6 +312,10 @@ impl Stream for Streaming { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { + if let State::Error = &self.state { + return Poll::Ready(None); + } + // FIXME: implement the ability to poll trailers when we _know_ that // the consumer of this stream will only poll for the first message. // This means we skip the poll_trailers step. @@ -321,6 +326,7 @@ impl Stream for Streaming { let chunk = match ready!(Pin::new(&mut self.body).poll_data(cx)) { Some(Ok(d)) => Some(d), Some(Err(e)) => { + let _ = std::mem::replace(&mut self.state, State::Error); let err: crate::Error = e.into(); debug!("decoder inner stream error: {:?}", err); let status = Status::from_error(err);