Skip to content

Commit

Permalink
Added FusedStream impl to Buffered
Browse files Browse the repository at this point in the history
  • Loading branch information
jean-airoldie committed Dec 23, 2019
1 parent 3c09c69 commit db02461
Showing 1 changed file with 18 additions and 1 deletion.
19 changes: 18 additions & 1 deletion futures-util/src/stream/stream/buffered.rs
@@ -1,4 +1,4 @@
use crate::stream::{Fuse, FuturesOrdered, StreamExt};
use crate::stream::{Fuse, FuturesOrdered, StreamExt, FusedStream};
use futures_core::future::Future;
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
Expand All @@ -18,6 +18,7 @@ where
stream: Fuse<St>,
in_progress_queue: FuturesOrdered<St::Item>,
max: usize,
is_terminated: bool,
}

impl<St> Unpin for Buffered<St>
Expand All @@ -36,23 +37,36 @@ where
.field("stream", &self.stream)
.field("in_progress_queue", &self.in_progress_queue)
.field("max", &self.max)
.field("is_terminated", &self.is_terminated)
.finish()
}
}

impl<St> FusedStream for Buffered<St>
where
St: FusedStream,
St::Item: Future,
{
fn is_terminated(&self) -> bool {
self.is_terminated
}
}

impl<St> Buffered<St>
where
St: Stream,
St::Item: Future,
{
unsafe_pinned!(stream: Fuse<St>);
unsafe_unpinned!(in_progress_queue: FuturesOrdered<St::Item>);
unsafe_unpinned!(is_terminated: bool);

pub(super) fn new(stream: St, n: usize) -> Buffered<St> {
Buffered {
stream: super::Fuse::new(stream),
in_progress_queue: FuturesOrdered::new(),
max: n,
is_terminated: false,
}
}

Expand Down Expand Up @@ -117,6 +131,9 @@ where

// If more values are still coming from the stream, we're not done yet
if self.stream.is_done() {
// We yield a `None`, so now the stream is considered terminated.
*self.as_mut().is_terminated() = true;

Poll::Ready(None)
} else {
Poll::Pending
Expand Down

0 comments on commit db02461

Please sign in to comment.