From daaed2991a3cfea77f2c9668aa5218d4b1e53618 Mon Sep 17 00:00:00 2001 From: Ding Xiang Fei Date: Fri, 3 Jan 2020 17:39:38 +0800 Subject: [PATCH 1/5] `Peek` future for `Peekable` stream --- futures-util/src/stream/mod.rs | 4 +- futures-util/src/stream/stream/mod.rs | 2 +- futures-util/src/stream/stream/peek.rs | 77 +++++++++++++++++++++----- futures/src/lib.rs | 2 +- 4 files changed, 66 insertions(+), 19 deletions(-) diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index 67bb39962a..a352331bd4 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -14,8 +14,8 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream}; mod stream; pub use self::stream::{ Chain, Collect, Concat, Enumerate, Filter, FilterMap, Flatten, Fold, ForEach, Fuse, Inspect, - Map, Next, Peekable, SelectNextSome, Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeWhile, - Then, Zip, + Map, Next, Peek, Peekable, SelectNextSome, Skip, SkipWhile, StreamExt, StreamFuture, Take, + TakeWhile, Then, Zip, }; #[cfg(feature = "std")] diff --git a/futures-util/src/stream/stream/mod.rs b/futures-util/src/stream/stream/mod.rs index 62da0f2cb2..44726ca0c7 100644 --- a/futures-util/src/stream/stream/mod.rs +++ b/futures-util/src/stream/stream/mod.rs @@ -88,7 +88,7 @@ pub use self::select_next_some::SelectNextSome; mod peek; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 -pub use self::peek::Peekable; +pub use self::peek::{Peek, Peekable}; mod skip; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 diff --git a/futures-util/src/stream/stream/peek.rs b/futures-util/src/stream/stream/peek.rs index 2d0b23cdde..6637f6c2f2 100644 --- a/futures-util/src/stream/stream/peek.rs +++ b/futures-util/src/stream/stream/peek.rs @@ -1,10 +1,13 @@ -use crate::stream::{StreamExt, Fuse}; +use crate::future::Either; +use crate::stream::{Fuse, StreamExt}; use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use std::fmt; /// A `Stream` that implements a `peek` method. /// @@ -27,7 +30,7 @@ impl Peekable { pub(super) fn new(stream: St) -> Peekable { Peekable { stream: stream.fuse(), - peeked: None + peeked: None, } } @@ -67,21 +70,26 @@ impl Peekable { /// /// This method polls the underlying stream and return either a reference /// to the next item if the stream is ready or passes through any errors. - pub fn poll_peek( + pub fn peek(self: Pin<&mut Self>) -> Peek<'_, St> { + Peek { inner: Some(self) } + } + + fn poll_peek( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Either, Poll>> { if self.peeked.is_some() { let this: &Self = self.into_ref().get_ref(); - return Poll::Ready(this.peeked.as_ref()) + return Either::Right(Poll::Ready(this.peeked.as_ref())); } - match ready!(self.as_mut().stream().poll_next(cx)) { - None => Poll::Ready(None), - Some(item) => { + match self.as_mut().stream().poll_next(cx) { + Poll::Ready(None) => Either::Right(Poll::Ready(None)), + Poll::Ready(Some(item)) => { *self.as_mut().peeked() = Some(item); let this: &Self = self.into_ref().get_ref(); - Poll::Ready(this.peeked.as_ref()) + Either::Right(Poll::Ready(this.peeked.as_ref())) } + _ => Either::Left(self), } } } @@ -95,12 +103,9 @@ impl FusedStream for Peekable { impl Stream for Peekable { type Item = S::Item; - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { if let Some(item) = self.as_mut().peeked().take() { - return Poll::Ready(Some(item)) + return Poll::Ready(Some(item)); } self.as_mut().stream().poll_next(cx) } @@ -120,9 +125,51 @@ impl Stream for Peekable { // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] impl Sink for Peekable - where S: Sink + Stream +where + S: Sink + Stream, { type Error = S::Error; delegate_sink!(stream, Item); } + +/// Future for the [`peek()`] function from [`Peekable`] +#[must_use = "futures do nothing unless polled"] +pub struct Peek<'a, St: Stream> { + inner: Option>>, +} + +impl Unpin for Peek<'_, St> {} + +impl fmt::Debug for Peek<'_, St> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Peek") + .finish() + } +} + +impl FusedFuture for Peek<'_, St> { + fn is_terminated(&self) -> bool { + self.inner.is_none() + } +} + +impl<'a, St> Future for Peek<'a, St> +where + St: Stream, +{ + type Output = Option<&'a St::Item>; + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if let Some(peekable) = self.inner.take() { + match peekable.poll_peek(cx) { + Either::Left(peekable) => { + self.inner = Some(peekable); + Poll::Pending + } + Either::Right(poll) => poll, + } + } else { + Poll::Pending + } + } +} diff --git a/futures/src/lib.rs b/futures/src/lib.rs index 84e2a3f9e3..fa11cb9412 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -444,7 +444,7 @@ pub mod stream { StreamExt, Chain, Collect, Concat, Enumerate, Filter, FilterMap, Flatten, Fold, Forward, ForEach, Fuse, StreamFuture, Inspect, Map, Next, - SelectNextSome, Peekable, Skip, SkipWhile, Take, TakeWhile, + SelectNextSome, Peek, Peekable, Skip, SkipWhile, Take, TakeWhile, Then, Zip, TryStreamExt, From f9b0cb6765c72935ee3546b7e7fcc671ccac2e6b Mon Sep 17 00:00:00 2001 From: Ding Xiang Fei Date: Fri, 3 Jan 2020 17:51:40 +0800 Subject: [PATCH 2/5] use core::fmt --- futures-util/src/stream/stream/peek.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/futures-util/src/stream/stream/peek.rs b/futures-util/src/stream/stream/peek.rs index 6637f6c2f2..a142ec010d 100644 --- a/futures-util/src/stream/stream/peek.rs +++ b/futures-util/src/stream/stream/peek.rs @@ -1,5 +1,6 @@ use crate::future::Either; use crate::stream::{Fuse, StreamExt}; +use core::fmt; use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::stream::{FusedStream, Stream}; @@ -7,7 +8,6 @@ use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; use pin_utils::{unsafe_pinned, unsafe_unpinned}; -use std::fmt; /// A `Stream` that implements a `peek` method. /// From e5c60ad95ae7562f6a5a8204c3e3318d0372d353 Mon Sep 17 00:00:00 2001 From: Ding Xiang Fei Date: Sun, 5 Jan 2020 19:30:39 +0800 Subject: [PATCH 3/5] reinstate the previous public interface; document the interpretation of return values --- futures-util/src/stream/stream/peek.rs | 48 +++++++++++++++++++------- 1 file changed, 35 insertions(+), 13 deletions(-) diff --git a/futures-util/src/stream/stream/peek.rs b/futures-util/src/stream/stream/peek.rs index a142ec010d..c17f0fb698 100644 --- a/futures-util/src/stream/stream/peek.rs +++ b/futures-util/src/stream/stream/peek.rs @@ -66,32 +66,49 @@ impl Peekable { self.stream.into_inner() } - /// Peek retrieves a reference to the next item in the stream. - /// - /// This method polls the underlying stream and return either a reference - /// to the next item if the stream is ready or passes through any errors. + /// Produces a `Peek` future which retrieves a reference to the next item + /// in the stream, or `None` if the underlying stream terminates. pub fn peek(self: Pin<&mut Self>) -> Peek<'_, St> { Peek { inner: Some(self) } } - fn poll_peek( + /// Attempt to poll the underlying stream, and return the mutable borrow + /// in case that is desirable to try for another time. + /// In case a peeking poll is successful, the reference to the next item + /// will be in the `Either::Right` variant; otherwise, the mutable borrow + /// will be in the `Either::Left` variant. + fn do_poll_peek( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Either, Poll>> { + ) -> Either, Option<&St::Item>> { if self.peeked.is_some() { let this: &Self = self.into_ref().get_ref(); - return Either::Right(Poll::Ready(this.peeked.as_ref())); + return Either::Right(this.peeked.as_ref()); } match self.as_mut().stream().poll_next(cx) { - Poll::Ready(None) => Either::Right(Poll::Ready(None)), + Poll::Ready(None) => Either::Right(None), Poll::Ready(Some(item)) => { *self.as_mut().peeked() = Some(item); let this: &Self = self.into_ref().get_ref(); - Either::Right(Poll::Ready(this.peeked.as_ref())) + Either::Right(this.peeked.as_ref()) } _ => Either::Left(self), } } + + /// Peek retrieves a reference to the next item in the stream. + /// + /// This method polls the underlying stream and return either a reference + /// to the next item if the stream is ready or passes through any errors. + pub fn poll_peek( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + match self.do_poll_peek(cx) { + Either::Left(_) => Poll::Pending, + Either::Right(poll) => Poll::Ready(poll), + } + } } impl FusedStream for Peekable { @@ -141,9 +158,14 @@ pub struct Peek<'a, St: Stream> { impl Unpin for Peek<'_, St> {} -impl fmt::Debug for Peek<'_, St> { +impl fmt::Debug for Peek<'_, St> +where + St: Stream + fmt::Debug, + St::Item: fmt::Debug, +{ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Peek") + .field("inner", &self.inner) .finish() } } @@ -161,15 +183,15 @@ where type Output = Option<&'a St::Item>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { if let Some(peekable) = self.inner.take() { - match peekable.poll_peek(cx) { + match peekable.do_poll_peek(cx) { Either::Left(peekable) => { self.inner = Some(peekable); Poll::Pending } - Either::Right(poll) => poll, + Either::Right(peek) => Poll::Ready(peek), } } else { - Poll::Pending + panic!("Peek polled after completion") } } } From f753463ef82456b8c177097435100f0346787c2c Mon Sep 17 00:00:00 2001 From: Ding Xiang Fei Date: Wed, 8 Jan 2020 15:59:44 +0800 Subject: [PATCH 4/5] examplar use of `Peekable::peek` --- futures/tests/stream_peekable.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 futures/tests/stream_peekable.rs diff --git a/futures/tests/stream_peekable.rs b/futures/tests/stream_peekable.rs new file mode 100644 index 0000000000..b65a0572cb --- /dev/null +++ b/futures/tests/stream_peekable.rs @@ -0,0 +1,13 @@ +use futures::executor::block_on; +use futures::pin_mut; +use futures::stream::{self, Peekable, StreamExt}; + +#[test] +fn peekable() { + block_on(async { + let peekable: Peekable<_> = stream::iter(vec![1u8, 2, 3]).peekable(); + pin_mut!(peekable); + assert_eq!(peekable.as_mut().peek().await, Some(&1u8)); + assert_eq!(peekable.collect::>().await, vec![1, 2, 3]); + }); +} From bf232dd68a9beaca65cca2f36d60973e464c9b35 Mon Sep 17 00:00:00 2001 From: Ding Xiang Fei Date: Thu, 9 Jan 2020 14:45:28 +0800 Subject: [PATCH 5/5] rectify cargo doc reference --- futures-util/src/stream/stream/peek.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/futures-util/src/stream/stream/peek.rs b/futures-util/src/stream/stream/peek.rs index c17f0fb698..9272bafe90 100644 --- a/futures-util/src/stream/stream/peek.rs +++ b/futures-util/src/stream/stream/peek.rs @@ -150,7 +150,7 @@ where delegate_sink!(stream, Item); } -/// Future for the [`peek()`] function from [`Peekable`] +/// Future for the [`Peekable::peek()`](self::Peekable::peek) function from [`Peekable`] #[must_use = "futures do nothing unless polled"] pub struct Peek<'a, St: Stream> { inner: Option>>,