Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Peek future for Peekable stream #2021

Merged
merged 5 commits into from Jan 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions futures-util/src/stream/mod.rs
Expand Up @@ -14,8 +14,8 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream};
mod stream;
pub use self::stream::{
Chain, Collect, Concat, Enumerate, Filter, FilterMap, Flatten, Fold, ForEach, Fuse, Inspect,
Map, Next, Peekable, SelectNextSome, Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeWhile,
Then, Zip,
Map, Next, Peek, Peekable, SelectNextSome, Skip, SkipWhile, StreamExt, StreamFuture, Take,
TakeWhile, Then, Zip,
};

#[cfg(feature = "std")]
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/stream/mod.rs
Expand Up @@ -88,7 +88,7 @@ pub use self::select_next_some::SelectNextSome;

mod peek;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::peek::Peekable;
pub use self::peek::{Peek, Peekable};

mod skip;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
Expand Down
107 changes: 88 additions & 19 deletions futures-util/src/stream/stream/peek.rs
@@ -1,5 +1,8 @@
use crate::stream::{StreamExt, Fuse};
use crate::future::Either;
use crate::stream::{Fuse, StreamExt};
use core::fmt;
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
Expand Down Expand Up @@ -27,7 +30,7 @@ impl<St: Stream> Peekable<St> {
pub(super) fn new(stream: St) -> Peekable<St> {
Peekable {
stream: stream.fuse(),
peeked: None
peeked: None,
}
}

Expand Down Expand Up @@ -63,25 +66,47 @@ impl<St: Stream> Peekable<St> {
self.stream.into_inner()
}

/// Peek retrieves a reference to the next item in the stream.
///
/// This method polls the underlying stream and return either a reference
/// to the next item if the stream is ready or passes through any errors.
pub fn poll_peek(
/// Produces a `Peek` future which retrieves a reference to the next item
/// in the stream, or `None` if the underlying stream terminates.
pub fn peek(self: Pin<&mut Self>) -> Peek<'_, St> {
Peek { inner: Some(self) }
}

/// Attempt to poll the underlying stream, and return the mutable borrow
/// in case that is desirable to try for another time.
/// In case a peeking poll is successful, the reference to the next item
/// will be in the `Either::Right` variant; otherwise, the mutable borrow
/// will be in the `Either::Left` variant.
fn do_poll_peek(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<&St::Item>> {
) -> Either<Pin<&mut Self>, Option<&St::Item>> {
if self.peeked.is_some() {
let this: &Self = self.into_ref().get_ref();
return Poll::Ready(this.peeked.as_ref())
return Either::Right(this.peeked.as_ref());
}
match ready!(self.as_mut().stream().poll_next(cx)) {
None => Poll::Ready(None),
Some(item) => {
match self.as_mut().stream().poll_next(cx) {
Poll::Ready(None) => Either::Right(None),
Poll::Ready(Some(item)) => {
*self.as_mut().peeked() = Some(item);
let this: &Self = self.into_ref().get_ref();
Poll::Ready(this.peeked.as_ref())
Either::Right(this.peeked.as_ref())
}
_ => Either::Left(self),
}
}

/// Peek retrieves a reference to the next item in the stream.
///
/// This method polls the underlying stream and return either a reference
/// to the next item if the stream is ready or passes through any errors.
pub fn poll_peek(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<&St::Item>> {
match self.do_poll_peek(cx) {
Either::Left(_) => Poll::Pending,
Either::Right(poll) => Poll::Ready(poll),
}
}
}
Expand All @@ -95,12 +120,9 @@ impl<St: Stream> FusedStream for Peekable<St> {
impl<S: Stream> Stream for Peekable<S> {
type Item = S::Item;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(item) = self.as_mut().peeked().take() {
return Poll::Ready(Some(item))
return Poll::Ready(Some(item));
}
self.as_mut().stream().poll_next(cx)
}
Expand All @@ -120,9 +142,56 @@ impl<S: Stream> Stream for Peekable<S> {
// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<S, Item> Sink<Item> for Peekable<S>
where S: Sink<Item> + Stream
where
S: Sink<Item> + Stream,
{
type Error = S::Error;

delegate_sink!(stream, Item);
}

/// Future for the [`Peekable::peek()`](self::Peekable::peek) function from [`Peekable`]
#[must_use = "futures do nothing unless polled"]
pub struct Peek<'a, St: Stream> {
inner: Option<Pin<&'a mut Peekable<St>>>,
}

impl<St: Stream> Unpin for Peek<'_, St> {}

impl<St> fmt::Debug for Peek<'_, St>
where
St: Stream + fmt::Debug,
St::Item: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Peek")
.field("inner", &self.inner)
.finish()
}
}

impl<St: Stream> FusedFuture for Peek<'_, St> {
fn is_terminated(&self) -> bool {
self.inner.is_none()
}
}

impl<'a, St> Future for Peek<'a, St>
where
St: Stream,
{
type Output = Option<&'a St::Item>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Some(peekable) = self.inner.take() {
match peekable.do_poll_peek(cx) {
Either::Left(peekable) => {
self.inner = Some(peekable);
Poll::Pending
}
Either::Right(peek) => Poll::Ready(peek),
}
} else {
panic!("Peek polled after completion")
}
}
}
2 changes: 1 addition & 1 deletion futures/src/lib.rs
Expand Up @@ -444,7 +444,7 @@ pub mod stream {
StreamExt,
Chain, Collect, Concat, Enumerate, Filter, FilterMap, Flatten, Fold,
Forward, ForEach, Fuse, StreamFuture, Inspect, Map, Next,
SelectNextSome, Peekable, Skip, SkipWhile, Take, TakeWhile,
SelectNextSome, Peek, Peekable, Skip, SkipWhile, Take, TakeWhile,
Then, Zip,

TryStreamExt,
Expand Down
13 changes: 13 additions & 0 deletions futures/tests/stream_peekable.rs
@@ -0,0 +1,13 @@
use futures::executor::block_on;
use futures::pin_mut;
use futures::stream::{self, Peekable, StreamExt};

#[test]
fn peekable() {
block_on(async {
let peekable: Peekable<_> = stream::iter(vec![1u8, 2, 3]).peekable();
pin_mut!(peekable);
assert_eq!(peekable.as_mut().peek().await, Some(&1u8));
assert_eq!(peekable.collect::<Vec<u8>>().await, vec![1, 2, 3]);
});
}