Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stream::Peekable::{next_if, next_if_eq} #2379

Merged
merged 1 commit into from Apr 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion futures-util/src/stream/mod.rs
Expand Up @@ -19,7 +19,7 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream};
mod stream;
pub use self::stream::{
Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach,
Fuse, Inspect, Map, Next, Peek, Peekable, Scan, SelectNextSome, Skip, SkipWhile, StreamExt,
Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, Peekable, Scan, SelectNextSome, Skip, SkipWhile, StreamExt,
StreamFuture, Take, TakeUntil, TakeWhile, Then, Unzip, Zip,
};

Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/stream/mod.rs
Expand Up @@ -123,7 +123,7 @@ pub use self::select_next_some::SelectNextSome;

mod peek;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::peek::{Peek, Peekable};
pub use self::peek::{Peek, Peekable, NextIf, NextIfEq};

mod skip;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
Expand Down
228 changes: 220 additions & 8 deletions futures-util/src/stream/stream/peek.rs
@@ -1,5 +1,7 @@
use crate::fns::FnOnce1;
use crate::stream::{Fuse, StreamExt};
use core::fmt;
use core::marker::PhantomData;
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::ready;
Expand Down Expand Up @@ -44,10 +46,7 @@ impl<St: Stream> Peekable<St> {
///
/// This method polls the underlying stream and return either a reference
/// to the next item if the stream is ready or passes through any errors.
pub fn poll_peek(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<&St::Item>> {
pub fn poll_peek(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<&St::Item>> {
let mut this = self.project();

Poll::Ready(loop {
Expand All @@ -60,6 +59,96 @@ impl<St: Stream> Peekable<St> {
}
})
}

/// Creates a future which will consume and return the next value of this
/// stream if a condition is true.
///
/// If `func` returns `true` for the next value of this stream, consume and
/// return it. Otherwise, return `None`.
///
/// # Examples
///
/// Consume a number if it's equal to 0.
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::stream::{self, StreamExt};
/// use futures::pin_mut;
///
/// let stream = stream::iter(0..5).peekable();
/// pin_mut!(stream);
/// // The first item of the stream is 0; consume it.
/// assert_eq!(stream.as_mut().next_if(|&x| x == 0).await, Some(0));
/// // The next item returned is now 1, so `consume` will return `false`.
/// assert_eq!(stream.as_mut().next_if(|&x| x == 0).await, None);
/// // `next_if` saves the value of the next item if it was not equal to `expected`.
/// assert_eq!(stream.next().await, Some(1));
/// # });
/// ```
///
/// Consume any number less than 10.
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::stream::{self, StreamExt};
/// use futures::pin_mut;
///
/// let stream = stream::iter(1..20).peekable();
/// pin_mut!(stream);
/// // Consume all numbers less than 10
/// while stream.as_mut().next_if(|&x| x < 10).await.is_some() {}
/// // The next value returned will be 10
/// assert_eq!(stream.next().await, Some(10));
/// # });
/// ```
pub fn next_if<F>(self: Pin<&mut Self>, func: F) -> NextIf<'_, St, F>
where
F: FnOnce(&St::Item) -> bool,
{
NextIf {
inner: Some((self, func)),
}
}

/// Creates a future which will consume and return the next item if it is
/// equal to `expected`.
///
/// # Example
///
/// Consume a number if it's equal to 0.
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::stream::{self, StreamExt};
/// use futures::pin_mut;
///
/// let stream = stream::iter(0..5).peekable();
/// pin_mut!(stream);
/// // The first item of the stream is 0; consume it.
/// assert_eq!(stream.as_mut().next_if_eq(&0).await, Some(0));
/// // The next item returned is now 1, so `consume` will return `false`.
/// assert_eq!(stream.as_mut().next_if_eq(&0).await, None);
/// // `next_if_eq` saves the value of the next item if it was not equal to `expected`.
/// assert_eq!(stream.next().await, Some(1));
/// # });
/// ```
pub fn next_if_eq<'a, T>(self: Pin<&'a mut Self>, expected: &'a T) -> NextIfEq<'a, St, T>
where
T: ?Sized,
St::Item: PartialEq<T>,
{
NextIfEq {
inner: NextIf {
inner: Some((
self,
NextIfEqFn {
expected,
_next: PhantomData,
},
)),
},
}
}
}

impl<St: Stream> FusedStream for Peekable<St> {
Expand Down Expand Up @@ -103,7 +192,7 @@ where
}

pin_project! {
/// Future for the [`Peekable::peek()`](self::Peekable::peek) function from [`Peekable`]
/// Future for the [`Peekable::peek`](self::Peekable::peek) method.
#[must_use = "futures do nothing unless polled"]
pub struct Peek<'a, St: Stream> {
inner: Option<Pin<&'a mut Peekable<St>>>,
Expand All @@ -116,9 +205,7 @@ where
St::Item: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Peek")
.field("inner", &self.inner)
.finish()
f.debug_struct("Peek").field("inner", &self.inner).finish()
}
}

Expand All @@ -133,6 +220,7 @@ where
St: Stream,
{
type Output = Option<&'a St::Item>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let inner = self.project().inner;
if let Some(peekable) = inner {
Expand All @@ -144,3 +232,127 @@ where
}
}
}

pin_project! {
/// Future for the [`Peekable::next_if`](self::Peekable::next_if) method.
#[must_use = "futures do nothing unless polled"]
pub struct NextIf<'a, St: Stream, F> {
inner: Option<(Pin<&'a mut Peekable<St>>, F)>,
}
}

impl<St, F> fmt::Debug for NextIf<'_, St, F>
where
St: Stream + fmt::Debug,
St::Item: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("NextIf")
.field("inner", &self.inner.as_ref().map(|(s, _f)| s))
.finish()
}
}

#[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058
impl<St, F> FusedFuture for NextIf<'_, St, F>
where
St: Stream,
F: for<'a> FnOnce1<&'a St::Item, Output = bool>,
{
fn is_terminated(&self) -> bool {
self.inner.is_none()
}
}

#[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058
impl<St, F> Future for NextIf<'_, St, F>
where
St: Stream,
F: for<'a> FnOnce1<&'a St::Item, Output = bool>,
{
type Output = Option<St::Item>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let inner = self.project().inner;
if let Some((peekable, _)) = inner {
let res = ready!(peekable.as_mut().poll_next(cx));

let (peekable, func) = inner.take().unwrap();
match res {
Some(ref matched) if func.call_once(matched) => Poll::Ready(res),
other => {
let peekable = peekable.project();
// Since we called `self.next()`, we consumed `self.peeked`.
assert!(peekable.peeked.is_none());
*peekable.peeked = other;
Poll::Ready(None)
}
}
} else {
panic!("NextIf polled after completion")
}
}
}

pin_project! {
/// Future for the [`Peekable::next_if_eq`](self::Peekable::next_if_eq) method.
#[must_use = "futures do nothing unless polled"]
pub struct NextIfEq<'a, St: Stream, T: ?Sized> {
#[pin]
inner: NextIf<'a, St, NextIfEqFn<'a, T, St::Item>>,
}
}

impl<St, T> fmt::Debug for NextIfEq<'_, St, T>
where
St: Stream + fmt::Debug,
St::Item: fmt::Debug,
T: ?Sized,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("NextIfEq")
.field("inner", &self.inner.inner.as_ref().map(|(s, _f)| s))
.finish()
}
}

impl<St, T> FusedFuture for NextIfEq<'_, St, T>
where
St: Stream,
T: ?Sized,
St::Item: PartialEq<T>,
{
fn is_terminated(&self) -> bool {
self.inner.is_terminated()
}
}

impl<St, T> Future for NextIfEq<'_, St, T>
where
St: Stream,
T: ?Sized,
St::Item: PartialEq<T>,
{
type Output = Option<St::Item>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().inner.poll(cx)
}
}

struct NextIfEqFn<'a, T: ?Sized, Item> {
expected: &'a T,
_next: PhantomData<Item>,
}

impl<T, Item> FnOnce1<&Item> for NextIfEqFn<'_, T, Item>
where
T: ?Sized,
Item: PartialEq<T>,
{
type Output = bool;

fn call_once(self, next: &Item) -> Self::Output {
next == self.expected
}
}
26 changes: 26 additions & 0 deletions futures/tests/stream_peekable.rs
Expand Up @@ -11,3 +11,29 @@ fn peekable() {
assert_eq!(peekable.collect::<Vec<u8>>().await, vec![1, 2, 3]);
});
}

#[test]
fn peekable_next_if_eq() {
block_on(async {
// first, try on references
let s = stream::iter(vec!["Heart", "of", "Gold"]).peekable();
pin_mut!(s);
// try before `peek()`
assert_eq!(s.as_mut().next_if_eq(&"trillian").await, None);
assert_eq!(s.as_mut().next_if_eq(&"Heart").await, Some("Heart"));
// try after peek()
assert_eq!(s.as_mut().peek().await, Some(&"of"));
assert_eq!(s.as_mut().next_if_eq(&"of").await, Some("of"));
assert_eq!(s.as_mut().next_if_eq(&"zaphod").await, None);
// make sure `next()` still behaves
assert_eq!(s.next().await, Some("Gold"));

// make sure comparison works for owned values
let s = stream::iter(vec![String::from("Ludicrous"), "speed".into()]).peekable();
pin_mut!(s);
// make sure basic functionality works
assert_eq!(s.as_mut().next_if_eq("Ludicrous").await, Some("Ludicrous".into()));
assert_eq!(s.as_mut().next_if_eq("speed").await, Some("speed".into()));
assert_eq!(s.as_mut().next_if_eq("").await, None);
});
}