diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index 67bb39962a..a352331bd4 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -14,8 +14,8 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream}; mod stream; pub use self::stream::{ Chain, Collect, Concat, Enumerate, Filter, FilterMap, Flatten, Fold, ForEach, Fuse, Inspect, - Map, Next, Peekable, SelectNextSome, Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeWhile, - Then, Zip, + Map, Next, Peek, Peekable, SelectNextSome, Skip, SkipWhile, StreamExt, StreamFuture, Take, + TakeWhile, Then, Zip, }; #[cfg(feature = "std")] diff --git a/futures-util/src/stream/stream/mod.rs b/futures-util/src/stream/stream/mod.rs index 62da0f2cb2..44726ca0c7 100644 --- a/futures-util/src/stream/stream/mod.rs +++ b/futures-util/src/stream/stream/mod.rs @@ -88,7 +88,7 @@ pub use self::select_next_some::SelectNextSome; mod peek; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 -pub use self::peek::Peekable; +pub use self::peek::{Peek, Peekable}; mod skip; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 diff --git a/futures-util/src/stream/stream/peek.rs b/futures-util/src/stream/stream/peek.rs index 2d0b23cdde..9272bafe90 100644 --- a/futures-util/src/stream/stream/peek.rs +++ b/futures-util/src/stream/stream/peek.rs @@ -1,5 +1,8 @@ -use crate::stream::{StreamExt, Fuse}; +use crate::future::Either; +use crate::stream::{Fuse, StreamExt}; +use core::fmt; use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] @@ -27,7 +30,7 @@ impl Peekable { pub(super) fn new(stream: St) -> Peekable { Peekable { stream: stream.fuse(), - peeked: None + peeked: None, } } @@ -63,25 +66,47 @@ impl Peekable { self.stream.into_inner() } - /// Peek retrieves a reference to the next item in the stream. - /// - /// This method polls the underlying stream and return either a reference - /// to the next item if the stream is ready or passes through any errors. - pub fn poll_peek( + /// Produces a `Peek` future which retrieves a reference to the next item + /// in the stream, or `None` if the underlying stream terminates. + pub fn peek(self: Pin<&mut Self>) -> Peek<'_, St> { + Peek { inner: Some(self) } + } + + /// Attempt to poll the underlying stream, and return the mutable borrow + /// in case that is desirable to try for another time. + /// In case a peeking poll is successful, the reference to the next item + /// will be in the `Either::Right` variant; otherwise, the mutable borrow + /// will be in the `Either::Left` variant. + fn do_poll_peek( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Either, Option<&St::Item>> { if self.peeked.is_some() { let this: &Self = self.into_ref().get_ref(); - return Poll::Ready(this.peeked.as_ref()) + return Either::Right(this.peeked.as_ref()); } - match ready!(self.as_mut().stream().poll_next(cx)) { - None => Poll::Ready(None), - Some(item) => { + match self.as_mut().stream().poll_next(cx) { + Poll::Ready(None) => Either::Right(None), + Poll::Ready(Some(item)) => { *self.as_mut().peeked() = Some(item); let this: &Self = self.into_ref().get_ref(); - Poll::Ready(this.peeked.as_ref()) + Either::Right(this.peeked.as_ref()) } + _ => Either::Left(self), + } + } + + /// Peek retrieves a reference to the next item in the stream. + /// + /// This method polls the underlying stream and return either a reference + /// to the next item if the stream is ready or passes through any errors. + pub fn poll_peek( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + match self.do_poll_peek(cx) { + Either::Left(_) => Poll::Pending, + Either::Right(poll) => Poll::Ready(poll), } } } @@ -95,12 +120,9 @@ impl FusedStream for Peekable { impl Stream for Peekable { type Item = S::Item; - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { if let Some(item) = self.as_mut().peeked().take() { - return Poll::Ready(Some(item)) + return Poll::Ready(Some(item)); } self.as_mut().stream().poll_next(cx) } @@ -120,9 +142,56 @@ impl Stream for Peekable { // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] impl Sink for Peekable - where S: Sink + Stream +where + S: Sink + Stream, { type Error = S::Error; delegate_sink!(stream, Item); } + +/// Future for the [`Peekable::peek()`](self::Peekable::peek) function from [`Peekable`] +#[must_use = "futures do nothing unless polled"] +pub struct Peek<'a, St: Stream> { + inner: Option>>, +} + +impl Unpin for Peek<'_, St> {} + +impl fmt::Debug for Peek<'_, St> +where + St: Stream + fmt::Debug, + St::Item: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Peek") + .field("inner", &self.inner) + .finish() + } +} + +impl FusedFuture for Peek<'_, St> { + fn is_terminated(&self) -> bool { + self.inner.is_none() + } +} + +impl<'a, St> Future for Peek<'a, St> +where + St: Stream, +{ + type Output = Option<&'a St::Item>; + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if let Some(peekable) = self.inner.take() { + match peekable.do_poll_peek(cx) { + Either::Left(peekable) => { + self.inner = Some(peekable); + Poll::Pending + } + Either::Right(peek) => Poll::Ready(peek), + } + } else { + panic!("Peek polled after completion") + } + } +} diff --git a/futures/src/lib.rs b/futures/src/lib.rs index 84e2a3f9e3..fa11cb9412 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -444,7 +444,7 @@ pub mod stream { StreamExt, Chain, Collect, Concat, Enumerate, Filter, FilterMap, Flatten, Fold, Forward, ForEach, Fuse, StreamFuture, Inspect, Map, Next, - SelectNextSome, Peekable, Skip, SkipWhile, Take, TakeWhile, + SelectNextSome, Peek, Peekable, Skip, SkipWhile, Take, TakeWhile, Then, Zip, TryStreamExt, diff --git a/futures/tests/stream_peekable.rs b/futures/tests/stream_peekable.rs new file mode 100644 index 0000000000..b65a0572cb --- /dev/null +++ b/futures/tests/stream_peekable.rs @@ -0,0 +1,13 @@ +use futures::executor::block_on; +use futures::pin_mut; +use futures::stream::{self, Peekable, StreamExt}; + +#[test] +fn peekable() { + block_on(async { + let peekable: Peekable<_> = stream::iter(vec![1u8, 2, 3]).peekable(); + pin_mut!(peekable); + assert_eq!(peekable.as_mut().peek().await, Some(&1u8)); + assert_eq!(peekable.collect::>().await, vec![1, 2, 3]); + }); +}