diff --git a/futures-util/src/stream/stream/buffered.rs b/futures-util/src/stream/stream/buffered.rs index 2445a85c52..3d0c5f5569 100644 --- a/futures-util/src/stream/stream/buffered.rs +++ b/futures-util/src/stream/stream/buffered.rs @@ -1,4 +1,4 @@ -use crate::stream::{Fuse, FuturesOrdered, StreamExt}; +use crate::stream::{Fuse, FuturesOrdered, StreamExt, FusedStream}; use futures_core::future::Future; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; @@ -18,6 +18,7 @@ where stream: Fuse, in_progress_queue: FuturesOrdered, max: usize, + is_terminated: bool, } impl Unpin for Buffered @@ -36,10 +37,21 @@ where .field("stream", &self.stream) .field("in_progress_queue", &self.in_progress_queue) .field("max", &self.max) + .field("is_terminated", &self.is_terminated) .finish() } } +impl FusedStream for Buffered +where + St: FusedStream, + St::Item: Future, +{ + fn is_terminated(&self) -> bool { + self.is_terminated + } +} + impl Buffered where St: Stream, @@ -47,12 +59,14 @@ where { unsafe_pinned!(stream: Fuse); unsafe_unpinned!(in_progress_queue: FuturesOrdered); + unsafe_unpinned!(is_terminated: bool); pub(super) fn new(stream: St, n: usize) -> Buffered { Buffered { stream: super::Fuse::new(stream), in_progress_queue: FuturesOrdered::new(), max: n, + is_terminated: false, } } @@ -117,6 +131,9 @@ where // If more values are still coming from the stream, we're not done yet if self.stream.is_done() { + // We yield a `None`, so now the stream is considered terminated. + *self.as_mut().is_terminated() = true; + Poll::Ready(None) } else { Poll::Pending