diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index a3740325f8..ec685b9848 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -19,8 +19,8 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream}; mod stream; pub use self::stream::{ Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach, - Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, Peekable, Scan, SelectNextSome, Skip, - SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, Unzip, Zip, + Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan, SelectNextSome, + Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, Unzip, Zip, }; #[cfg(feature = "std")] diff --git a/futures-util/src/stream/stream/mod.rs b/futures-util/src/stream/stream/mod.rs index b3b0155f67..86997f45c9 100644 --- a/futures-util/src/stream/stream/mod.rs +++ b/futures-util/src/stream/stream/mod.rs @@ -131,7 +131,7 @@ pub use self::select_next_some::SelectNextSome; mod peek; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 -pub use self::peek::{NextIf, NextIfEq, Peek, Peekable}; +pub use self::peek::{NextIf, NextIfEq, Peek, PeekMut, Peekable}; mod skip; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 diff --git a/futures-util/src/stream/stream/peek.rs b/futures-util/src/stream/stream/peek.rs index 217fabaa6e..c72dfc3666 100644 --- a/futures-util/src/stream/stream/peek.rs +++ b/futures-util/src/stream/stream/peek.rs @@ -33,7 +33,7 @@ impl Peekable { delegate_access_inner!(stream, St, (.)); - /// Produces a `Peek` future which retrieves a reference to the next item + /// Produces a future which retrieves a reference to the next item /// in the stream, or `None` if the underlying stream terminates. pub fn peek(self: Pin<&mut Self>) -> Peek<'_, St> { Peek { inner: Some(self) } @@ -57,6 +57,54 @@ impl Peekable { }) } + /// Produces a future which retrieves a mutable reference to the next item + /// in the stream, or `None` if the underlying stream terminates. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, StreamExt}; + /// use futures::pin_mut; + /// + /// let stream = stream::iter(vec![1, 2, 3]).peekable(); + /// pin_mut!(stream); + /// + /// assert_eq!(stream.as_mut().peek_mut().await, Some(&mut 1)); + /// assert_eq!(stream.as_mut().next().await, Some(1)); + /// + /// // Peek into the stream and modify the value which will be returned next + /// if let Some(p) = stream.as_mut().peek_mut().await { + /// if *p == 2 { + /// *p = 5; + /// } + /// } + /// + /// assert_eq!(stream.collect::>().await, vec![5, 3]); + /// # }); + /// ``` + pub fn peek_mut(self: Pin<&mut Self>) -> PeekMut<'_, St> { + PeekMut { inner: Some(self) } + } + + /// Peek retrieves a mutable reference to the next item in the stream. + pub fn poll_peek_mut( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let mut this = self.project(); + + Poll::Ready(loop { + if this.peeked.is_some() { + break this.peeked.as_mut(); + } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) { + *this.peeked = Some(item); + } else { + break None; + } + }) + } + /// Creates a future which will consume and return the next value of this /// stream if a condition is true. /// @@ -220,6 +268,48 @@ where } } +pin_project! { + /// Future for the [`Peekable::peek_mut`](self::Peekable::peek_mut) method. + #[must_use = "futures do nothing unless polled"] + pub struct PeekMut<'a, St: Stream> { + inner: Option>>, + } +} + +impl fmt::Debug for PeekMut<'_, St> +where + St: Stream + fmt::Debug, + St::Item: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PeekMut").field("inner", &self.inner).finish() + } +} + +impl FusedFuture for PeekMut<'_, St> { + fn is_terminated(&self) -> bool { + self.inner.is_none() + } +} + +impl<'a, St> Future for PeekMut<'a, St> +where + St: Stream, +{ + type Output = Option<&'a mut St::Item>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let inner = self.project().inner; + if let Some(peekable) = inner { + ready!(peekable.as_mut().poll_peek_mut(cx)); + + inner.take().unwrap().poll_peek_mut(cx) + } else { + panic!("PeekMut polled after completion") + } + } +} + pin_project! { /// Future for the [`Peekable::next_if`](self::Peekable::next_if) method. #[must_use = "futures do nothing unless polled"] diff --git a/futures/tests/auto_traits.rs b/futures/tests/auto_traits.rs index e0192a118b..0877e45f4d 100644 --- a/futures/tests/auto_traits.rs +++ b/futures/tests/auto_traits.rs @@ -470,6 +470,13 @@ pub mod future { assert_not_impl!(PollFn<*const ()>: Sync); assert_impl!(PollFn: Unpin); + assert_impl!(PollImmediate: Send); + assert_not_impl!(PollImmediate>: Send); + assert_impl!(PollImmediate: Sync); + assert_not_impl!(PollImmediate>: Sync); + assert_impl!(PollImmediate: Unpin); + assert_not_impl!(PollImmediate: Unpin); + assert_impl!(Ready<()>: Send); assert_not_impl!(Ready<*const ()>: Send); assert_impl!(Ready<()>: Sync); @@ -1430,6 +1437,14 @@ pub mod stream { assert_not_impl!(Peek<'_, LocalStream<()>>: Sync); assert_impl!(Peek<'_, PinnedStream>: Unpin); + assert_impl!(PeekMut<'_, SendStream<()>>: Send); + assert_not_impl!(PeekMut<'_, SendStream>: Send); + assert_not_impl!(PeekMut<'_, LocalStream<()>>: Send); + assert_impl!(PeekMut<'_, SyncStream<()>>: Sync); + assert_not_impl!(PeekMut<'_, SyncStream>: Sync); + assert_not_impl!(PeekMut<'_, LocalStream<()>>: Sync); + assert_impl!(PeekMut<'_, PinnedStream>: Unpin); + assert_impl!(Peekable>: Send); assert_not_impl!(Peekable: Send); assert_not_impl!(Peekable: Send); @@ -1451,6 +1466,13 @@ pub mod stream { assert_not_impl!(PollFn<*const ()>: Sync); assert_impl!(PollFn: Unpin); + assert_impl!(PollImmediate: Send); + assert_not_impl!(PollImmediate>: Send); + assert_impl!(PollImmediate: Sync); + assert_not_impl!(PollImmediate>: Sync); + assert_impl!(PollImmediate: Unpin); + assert_not_impl!(PollImmediate: Unpin); + assert_impl!(ReadyChunks>: Send); assert_not_impl!(ReadyChunks: Send); assert_not_impl!(ReadyChunks: Send); diff --git a/futures/tests/stream_peekable.rs b/futures/tests/stream_peekable.rs index 2fa7f3a4f5..2af20f4fa9 100644 --- a/futures/tests/stream_peekable.rs +++ b/futures/tests/stream_peekable.rs @@ -12,6 +12,20 @@ fn peekable() { }); } +#[test] +fn peekable_mut() { + block_on(async { + let s = stream::iter(vec![1u8, 2, 3]).peekable(); + pin_mut!(s); + if let Some(p) = s.as_mut().peek_mut().await { + if *p == 1 { + *p = 5; + } + } + assert_eq!(s.collect::>().await, vec![5, 2, 3]); + }); +} + #[test] fn peekable_next_if_eq() { block_on(async {