Skip to content

Commit

Permalink
Add stream::Peekable::{next_if, next_if_eq}
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e committed Apr 10, 2021
1 parent e8fdc8f commit fc02620
Show file tree
Hide file tree
Showing 4 changed files with 248 additions and 10 deletions.
2 changes: 1 addition & 1 deletion futures-util/src/stream/mod.rs
Expand Up @@ -19,7 +19,7 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream};
mod stream;
pub use self::stream::{
Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach,
Fuse, Inspect, Map, Next, Peek, Peekable, Scan, SelectNextSome, Skip, SkipWhile, StreamExt,
Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, Peekable, Scan, SelectNextSome, Skip, SkipWhile, StreamExt,
StreamFuture, Take, TakeUntil, TakeWhile, Then, Unzip, Zip,
};

Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/stream/mod.rs
Expand Up @@ -123,7 +123,7 @@ pub use self::select_next_some::SelectNextSome;

mod peek;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::peek::{Peek, Peekable};
pub use self::peek::{Peek, Peekable, NextIf, NextIfEq};

mod skip;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
Expand Down
228 changes: 220 additions & 8 deletions futures-util/src/stream/stream/peek.rs
@@ -1,5 +1,7 @@
use crate::fns::FnOnce1;
use crate::stream::{Fuse, StreamExt};
use core::fmt;
use core::marker::PhantomData;
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::ready;
Expand Down Expand Up @@ -44,10 +46,7 @@ impl<St: Stream> Peekable<St> {
///
/// This method polls the underlying stream and return either a reference
/// to the next item if the stream is ready or passes through any errors.
pub fn poll_peek(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<&St::Item>> {
pub fn poll_peek(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<&St::Item>> {
let mut this = self.project();

Poll::Ready(loop {
Expand All @@ -60,6 +59,96 @@ impl<St: Stream> Peekable<St> {
}
})
}

/// Creates a future which will consume and return the next value of this
/// stream if a condition is true.
///
/// If `func` returns `true` for the next value of this stream, consume and
/// return it. Otherwise, return `None`.
///
/// # Examples
///
/// Consume a number if it's equal to 0.
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::stream::{self, StreamExt};
/// use futures::pin_mut;
///
/// let stream = stream::iter(0..5).peekable();
/// pin_mut!(stream);
/// // The first item of the stream is 0; consume it.
/// assert_eq!(stream.as_mut().next_if(|&x| x == 0).await, Some(0));
/// // The next item returned is now 1, so `consume` will return `false`.
/// assert_eq!(stream.as_mut().next_if(|&x| x == 0).await, None);
/// // `next_if` saves the value of the next item if it was not equal to `expected`.
/// assert_eq!(stream.next().await, Some(1));
/// # });
/// ```
///
/// Consume any number less than 10.
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::stream::{self, StreamExt};
/// use futures::pin_mut;
///
/// let stream = stream::iter(1..20).peekable();
/// pin_mut!(stream);
/// // Consume all numbers less than 10
/// while stream.as_mut().next_if(|&x| x < 10).await.is_some() {}
/// // The next value returned will be 10
/// assert_eq!(stream.next().await, Some(10));
/// # });
/// ```
pub fn next_if<F>(self: Pin<&mut Self>, func: F) -> NextIf<'_, St, F>
where
F: FnOnce(&St::Item) -> bool,
{
NextIf {
inner: Some((self, func)),
}
}

/// Creates a future which will consume and return the next item if it is
/// equal to `expected`.
///
/// # Example
///
/// Consume a number if it's equal to 0.
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::stream::{self, StreamExt};
/// use futures::pin_mut;
///
/// let stream = stream::iter(0..5).peekable();
/// pin_mut!(stream);
/// // The first item of the stream is 0; consume it.
/// assert_eq!(stream.as_mut().next_if_eq(&0).await, Some(0));
/// // The next item returned is now 1, so `consume` will return `false`.
/// assert_eq!(stream.as_mut().next_if_eq(&0).await, None);
/// // `next_if_eq` saves the value of the next item if it was not equal to `expected`.
/// assert_eq!(stream.next().await, Some(1));
/// # });
/// ```
pub fn next_if_eq<'a, T>(self: Pin<&'a mut Self>, expected: &'a T) -> NextIfEq<'a, St, T>
where
T: ?Sized,
St::Item: PartialEq<T>,
{
NextIfEq {
inner: NextIf {
inner: Some((
self,
NextIfEqFn {
expected,
_next: PhantomData,
},
)),
},
}
}
}

impl<St: Stream> FusedStream for Peekable<St> {
Expand Down Expand Up @@ -103,7 +192,7 @@ where
}

pin_project! {
/// Future for the [`Peekable::peek()`](self::Peekable::peek) function from [`Peekable`]
/// Future for the [`Peekable::peek`](self::Peekable::peek) method.
#[must_use = "futures do nothing unless polled"]
pub struct Peek<'a, St: Stream> {
inner: Option<Pin<&'a mut Peekable<St>>>,
Expand All @@ -116,9 +205,7 @@ where
St::Item: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Peek")
.field("inner", &self.inner)
.finish()
f.debug_struct("Peek").field("inner", &self.inner).finish()
}
}

Expand All @@ -133,6 +220,7 @@ where
St: Stream,
{
type Output = Option<&'a St::Item>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let inner = self.project().inner;
if let Some(peekable) = inner {
Expand All @@ -144,3 +232,127 @@ where
}
}
}

pin_project! {
/// Future for the [`Peekable::next_if`](self::Peekable::next_if) method.
#[must_use = "futures do nothing unless polled"]
pub struct NextIf<'a, St: Stream, F> {
inner: Option<(Pin<&'a mut Peekable<St>>, F)>,
}
}

impl<St, F> fmt::Debug for NextIf<'_, St, F>
where
St: Stream + fmt::Debug,
St::Item: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("NextIf")
.field("inner", &self.inner.as_ref().map(|(s, _f)| s))
.finish()
}
}

#[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058
impl<St, F> FusedFuture for NextIf<'_, St, F>
where
St: Stream,
F: for<'a> FnOnce1<&'a St::Item, Output = bool>,
{
fn is_terminated(&self) -> bool {
self.inner.is_none()
}
}

#[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058
impl<St, F> Future for NextIf<'_, St, F>
where
St: Stream,
F: for<'a> FnOnce1<&'a St::Item, Output = bool>,
{
type Output = Option<St::Item>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let inner = self.project().inner;
if let Some((peekable, _)) = inner {
let res = ready!(peekable.as_mut().poll_next(cx));

let (peekable, func) = inner.take().unwrap();
match res {
Some(ref matched) if func.call_once(matched) => Poll::Ready(res),
other => {
let peekable = peekable.project();
// Since we called `self.next()`, we consumed `self.peeked`.
assert!(peekable.peeked.is_none());
*peekable.peeked = other;
Poll::Ready(None)
}
}
} else {
panic!("NextIf polled after completion")
}
}
}

pin_project! {
/// Future for the [`Peekable::next_if_eq`](self::Peekable::next_if_eq) method.
#[must_use = "futures do nothing unless polled"]
pub struct NextIfEq<'a, St: Stream, T: ?Sized> {
#[pin]
inner: NextIf<'a, St, NextIfEqFn<'a, T, St::Item>>,
}
}

impl<St, T> fmt::Debug for NextIfEq<'_, St, T>
where
St: Stream + fmt::Debug,
St::Item: fmt::Debug,
T: ?Sized,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("NextIfEq")
.field("inner", &self.inner.inner.as_ref().map(|(s, _f)| s))
.finish()
}
}

impl<St, T> FusedFuture for NextIfEq<'_, St, T>
where
St: Stream,
T: ?Sized,
St::Item: PartialEq<T>,
{
fn is_terminated(&self) -> bool {
self.inner.is_terminated()
}
}

impl<St, T> Future for NextIfEq<'_, St, T>
where
St: Stream,
T: ?Sized,
St::Item: PartialEq<T>,
{
type Output = Option<St::Item>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().inner.poll(cx)
}
}

struct NextIfEqFn<'a, T: ?Sized, Item> {
expected: &'a T,
_next: PhantomData<Item>,
}

impl<T, Item> FnOnce1<&Item> for NextIfEqFn<'_, T, Item>
where
T: ?Sized,
Item: PartialEq<T>,
{
type Output = bool;

fn call_once(self, next: &Item) -> Self::Output {
next == self.expected
}
}
26 changes: 26 additions & 0 deletions futures/tests/stream_peekable.rs
Expand Up @@ -11,3 +11,29 @@ fn peekable() {
assert_eq!(peekable.collect::<Vec<u8>>().await, vec![1, 2, 3]);
});
}

#[test]
fn peekable_next_if_eq() {
block_on(async {
// first, try on references
let s = stream::iter(vec!["Heart", "of", "Gold"]).peekable();
pin_mut!(s);
// try before `peek()`
assert_eq!(s.as_mut().next_if_eq(&"trillian").await, None);
assert_eq!(s.as_mut().next_if_eq(&"Heart").await, Some("Heart"));
// try after peek()
assert_eq!(s.as_mut().peek().await, Some(&"of"));
assert_eq!(s.as_mut().next_if_eq(&"of").await, Some("of"));
assert_eq!(s.as_mut().next_if_eq(&"zaphod").await, None);
// make sure `next()` still behaves
assert_eq!(s.next().await, Some("Gold"));

// make sure comparison works for owned values
let s = stream::iter(vec![String::from("Ludicrous"), "speed".into()]).peekable();
pin_mut!(s);
// make sure basic functionality works
assert_eq!(s.as_mut().next_if_eq("Ludicrous").await, Some("Ludicrous".into()));
assert_eq!(s.as_mut().next_if_eq("speed").await, Some("speed".into()));
assert_eq!(s.as_mut().next_if_eq("").await, None);
});
}

0 comments on commit fc02620

Please sign in to comment.