diff --git a/futures-util/src/stream/stream/all.rs b/futures-util/src/stream/stream/all.rs new file mode 100644 index 0000000000..ba2baa5cf1 --- /dev/null +++ b/futures-util/src/stream/stream/all.rs @@ -0,0 +1,92 @@ +use core::fmt; +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::ready; +use futures_core::stream::Stream; +use futures_core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future for the [`all`](super::StreamExt::all) method. + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct All { + #[pin] + stream: St, + f: F, + accum: Option, + #[pin] + future: Option, + } +} + +impl fmt::Debug for All +where + St: fmt::Debug, + Fut: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("All") + .field("stream", &self.stream) + .field("accum", &self.accum) + .field("future", &self.future) + .finish() + } +} + +impl All +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future, +{ + pub(super) fn new(stream: St, f: F) -> Self { + Self { stream, f, accum: Some(true), future: None } + } +} + +impl FusedFuture for All +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future, +{ + fn is_terminated(&self) -> bool { + self.accum.is_none() && self.future.is_none() + } +} + +impl Future for All +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future, +{ + type Output = bool; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + Poll::Ready(loop { + if let Some(fut) = this.future.as_mut().as_pin_mut() { + // we're currently processing a future to produce a new accum value + let acc = this.accum.unwrap() && ready!(fut.poll(cx)); + if !acc { + break false; + } // early exit + *this.accum = Some(acc); + this.future.set(None); + } else if this.accum.is_some() { + // we're waiting on a new item from the stream + match ready!(this.stream.as_mut().poll_next(cx)) { + Some(item) => { + this.future.set(Some((this.f)(item))); + } + None => { + break this.accum.take().unwrap(); + } + } + } else { + panic!("All polled after completion") + } + }) + } +} diff --git a/futures-util/src/stream/stream/any.rs b/futures-util/src/stream/stream/any.rs new file mode 100644 index 0000000000..f023125c70 --- /dev/null +++ b/futures-util/src/stream/stream/any.rs @@ -0,0 +1,92 @@ +use core::fmt; +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::ready; +use futures_core::stream::Stream; +use futures_core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future for the [`any`](super::StreamExt::any) method. + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Any { + #[pin] + stream: St, + f: F, + accum: Option, + #[pin] + future: Option, + } +} + +impl fmt::Debug for Any +where + St: fmt::Debug, + Fut: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Any") + .field("stream", &self.stream) + .field("accum", &self.accum) + .field("future", &self.future) + .finish() + } +} + +impl Any +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future, +{ + pub(super) fn new(stream: St, f: F) -> Self { + Self { stream, f, accum: Some(false), future: None } + } +} + +impl FusedFuture for Any +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future, +{ + fn is_terminated(&self) -> bool { + self.accum.is_none() && self.future.is_none() + } +} + +impl Future for Any +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future, +{ + type Output = bool; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + Poll::Ready(loop { + if let Some(fut) = this.future.as_mut().as_pin_mut() { + // we're currently processing a future to produce a new accum value + let acc = this.accum.unwrap() || ready!(fut.poll(cx)); + if acc { + break true; + } // early exit + *this.accum = Some(acc); + this.future.set(None); + } else if this.accum.is_some() { + // we're waiting on a new item from the stream + match ready!(this.stream.as_mut().poll_next(cx)) { + Some(item) => { + this.future.set(Some((this.f)(item))); + } + None => { + break this.accum.take().unwrap(); + } + } + } else { + panic!("Any polled after completion") + } + }) + } +} diff --git a/futures-util/src/stream/stream/mod.rs b/futures-util/src/stream/stream/mod.rs index 420ac82dea..b3b0155f67 100644 --- a/futures-util/src/stream/stream/mod.rs +++ b/futures-util/src/stream/stream/mod.rs @@ -70,6 +70,14 @@ mod fold; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::fold::Fold; +mod any; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::any::Any; + +mod all; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::all::All; + #[cfg(feature = "sink")] mod forward; @@ -627,6 +635,50 @@ pub trait StreamExt: Stream { assert_future::(Fold::new(self, f, init)) } + /// Execute predicate over asynchronous stream, and return `true` if any element in stream satisfied a predicate. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, StreamExt}; + /// + /// let number_stream = stream::iter(0..10); + /// let contain_three = number_stream.any(|i| async move { i == 3 }); + /// assert_eq!(contain_three.await, true); + /// # }); + /// ``` + fn any(self, f: F) -> Any + where + F: FnMut(Self::Item) -> Fut, + Fut: Future, + Self: Sized, + { + assert_future::(Any::new(self, f)) + } + + /// Execute predicate over asynchronous stream, and return `true` if all element in stream satisfied a predicate. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, StreamExt}; + /// + /// let number_stream = stream::iter(0..10); + /// let less_then_twenty = number_stream.all(|i| async move { i < 20 }); + /// assert_eq!(less_then_twenty.await, true); + /// # }); + /// ``` + fn all(self, f: F) -> All + where + F: FnMut(Self::Item) -> Fut, + Fut: Future, + Self: Sized, + { + assert_future::(All::new(self, f)) + } + /// Flattens a stream of streams into just one continuous stream. /// /// # Examples