Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Peekable::{peek_mut, poll_peek_mut} #2488

Merged
merged 1 commit into from Aug 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions futures-util/src/stream/mod.rs
Expand Up @@ -19,9 +19,9 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream};
mod stream;
pub use self::stream::{
Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach,
Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, Peekable, Scan, SelectNextSome, Skip,
SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, TryFold, TryForEach,
Unzip, Zip,
Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan, SelectNextSome,
Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, TryFold,
TryForEach, Unzip, Zip,
};

#[cfg(feature = "std")]
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/stream/mod.rs
Expand Up @@ -131,7 +131,7 @@ pub use self::select_next_some::SelectNextSome;

mod peek;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::peek::{NextIf, NextIfEq, Peek, Peekable};
pub use self::peek::{NextIf, NextIfEq, Peek, PeekMut, Peekable};

mod skip;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
Expand Down
92 changes: 91 additions & 1 deletion futures-util/src/stream/stream/peek.rs
Expand Up @@ -33,7 +33,7 @@ impl<St: Stream> Peekable<St> {

delegate_access_inner!(stream, St, (.));

/// Produces a `Peek` future which retrieves a reference to the next item
/// Produces a future which retrieves a reference to the next item
/// in the stream, or `None` if the underlying stream terminates.
pub fn peek(self: Pin<&mut Self>) -> Peek<'_, St> {
Peek { inner: Some(self) }
Expand All @@ -57,6 +57,54 @@ impl<St: Stream> Peekable<St> {
})
}

/// Produces a future which retrieves a mutable reference to the next item
/// in the stream, or `None` if the underlying stream terminates.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::stream::{self, StreamExt};
/// use futures::pin_mut;
///
/// let stream = stream::iter(vec![1, 2, 3]).peekable();
/// pin_mut!(stream);
///
/// assert_eq!(stream.as_mut().peek_mut().await, Some(&mut 1));
/// assert_eq!(stream.as_mut().next().await, Some(1));
///
/// // Peek into the stream and modify the value which will be returned next
/// if let Some(p) = stream.as_mut().peek_mut().await {
/// if *p == 2 {
/// *p = 5;
/// }
/// }
///
/// assert_eq!(stream.collect::<Vec<_>>().await, vec![5, 3]);
/// # });
/// ```
pub fn peek_mut(self: Pin<&mut Self>) -> PeekMut<'_, St> {
PeekMut { inner: Some(self) }
}

/// Peek retrieves a mutable reference to the next item in the stream.
pub fn poll_peek_mut(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<&mut St::Item>> {
let mut this = self.project();

Poll::Ready(loop {
if this.peeked.is_some() {
break this.peeked.as_mut();
} else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
*this.peeked = Some(item);
} else {
break None;
}
})
}

/// Creates a future which will consume and return the next value of this
/// stream if a condition is true.
///
Expand Down Expand Up @@ -220,6 +268,48 @@ where
}
}

pin_project! {
/// Future for the [`Peekable::peek_mut`](self::Peekable::peek_mut) method.
#[must_use = "futures do nothing unless polled"]
pub struct PeekMut<'a, St: Stream> {
inner: Option<Pin<&'a mut Peekable<St>>>,
}
}

impl<St> fmt::Debug for PeekMut<'_, St>
where
St: Stream + fmt::Debug,
St::Item: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PeekMut").field("inner", &self.inner).finish()
}
}

impl<St: Stream> FusedFuture for PeekMut<'_, St> {
fn is_terminated(&self) -> bool {
self.inner.is_none()
}
}

impl<'a, St> Future for PeekMut<'a, St>
where
St: Stream,
{
type Output = Option<&'a mut St::Item>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let inner = self.project().inner;
if let Some(peekable) = inner {
ready!(peekable.as_mut().poll_peek_mut(cx));

inner.take().unwrap().poll_peek_mut(cx)
} else {
panic!("PeekMut polled after completion")
}
}
}

pin_project! {
/// Future for the [`Peekable::next_if`](self::Peekable::next_if) method.
#[must_use = "futures do nothing unless polled"]
Expand Down
22 changes: 22 additions & 0 deletions futures/tests/auto_traits.rs
Expand Up @@ -470,6 +470,13 @@ pub mod future {
assert_not_impl!(PollFn<*const ()>: Sync);
assert_impl!(PollFn<PhantomPinned>: Unpin);

assert_impl!(PollImmediate<SendStream>: Send);
assert_not_impl!(PollImmediate<LocalStream<()>>: Send);
assert_impl!(PollImmediate<SyncStream>: Sync);
assert_not_impl!(PollImmediate<LocalStream<()>>: Sync);
assert_impl!(PollImmediate<UnpinStream>: Unpin);
assert_not_impl!(PollImmediate<PinnedStream>: Unpin);

assert_impl!(Ready<()>: Send);
assert_not_impl!(Ready<*const ()>: Send);
assert_impl!(Ready<()>: Sync);
Expand Down Expand Up @@ -1430,6 +1437,14 @@ pub mod stream {
assert_not_impl!(Peek<'_, LocalStream<()>>: Sync);
assert_impl!(Peek<'_, PinnedStream>: Unpin);

assert_impl!(PeekMut<'_, SendStream<()>>: Send);
assert_not_impl!(PeekMut<'_, SendStream>: Send);
assert_not_impl!(PeekMut<'_, LocalStream<()>>: Send);
assert_impl!(PeekMut<'_, SyncStream<()>>: Sync);
assert_not_impl!(PeekMut<'_, SyncStream>: Sync);
assert_not_impl!(PeekMut<'_, LocalStream<()>>: Sync);
assert_impl!(PeekMut<'_, PinnedStream>: Unpin);

assert_impl!(Peekable<SendStream<()>>: Send);
assert_not_impl!(Peekable<SendStream>: Send);
assert_not_impl!(Peekable<LocalStream>: Send);
Expand All @@ -1451,6 +1466,13 @@ pub mod stream {
assert_not_impl!(PollFn<*const ()>: Sync);
assert_impl!(PollFn<PhantomPinned>: Unpin);

assert_impl!(PollImmediate<SendStream>: Send);
assert_not_impl!(PollImmediate<LocalStream<()>>: Send);
assert_impl!(PollImmediate<SyncStream>: Sync);
assert_not_impl!(PollImmediate<LocalStream<()>>: Sync);
assert_impl!(PollImmediate<UnpinStream>: Unpin);
assert_not_impl!(PollImmediate<PinnedStream>: Unpin);

assert_impl!(ReadyChunks<SendStream<()>>: Send);
assert_not_impl!(ReadyChunks<SendStream>: Send);
assert_not_impl!(ReadyChunks<LocalStream>: Send);
Expand Down
14 changes: 14 additions & 0 deletions futures/tests/stream_peekable.rs
Expand Up @@ -12,6 +12,20 @@ fn peekable() {
});
}

#[test]
fn peekable_mut() {
block_on(async {
let s = stream::iter(vec![1u8, 2, 3]).peekable();
pin_mut!(s);
if let Some(p) = s.as_mut().peek_mut().await {
if *p == 1 {
*p = 5;
}
}
assert_eq!(s.collect::<Vec<_>>().await, vec![5, 2, 3]);
});
}

#[test]
fn peekable_next_if_eq() {
block_on(async {
Expand Down