From 7a78d03de646b67563f3164c6a1723c324de8aad Mon Sep 17 00:00:00 2001 From: Petros Angelatos Date: Wed, 8 Sep 2021 16:34:23 +0200 Subject: [PATCH] futures-util: add StreamExt::count method Signed-off-by: Petros Angelatos --- futures-util/src/stream/stream/count.rs | 59 +++++++++++++++++++++++++ futures-util/src/stream/stream/mod.rs | 36 +++++++++++++++ 2 files changed, 95 insertions(+) create mode 100644 futures-util/src/stream/stream/count.rs diff --git a/futures-util/src/stream/stream/count.rs b/futures-util/src/stream/stream/count.rs new file mode 100644 index 0000000000..d741618ec7 --- /dev/null +++ b/futures-util/src/stream/stream/count.rs @@ -0,0 +1,59 @@ +use core::fmt; +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::ready; +use futures_core::stream::{FusedStream, Stream}; +use futures_core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future for the [`count`](super::StreamExt::count) method. + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Count { + #[pin] + stream: St, + count: usize + } +} + +impl fmt::Debug for Count +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Count") + .field("stream", &self.stream) + .field("count", &self.count) + .finish() + } +} + +impl Count { + pub(super) fn new(stream: St) -> Self { + Self { + stream, + count: 0, + } + } +} + +impl FusedFuture for Count { + fn is_terminated(&self) -> bool { + self.stream.is_terminated() + } +} + +impl Future for Count { + type Output = usize; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + + Poll::Ready(loop { + match ready!(this.stream.as_mut().poll_next(cx)) { + Some(_) => { *this.count += 1 }, + None => break *this.count + } + }) + } +} diff --git a/futures-util/src/stream/stream/mod.rs b/futures-util/src/stream/stream/mod.rs index e10354312e..44839cfdcf 100644 --- a/futures-util/src/stream/stream/mod.rs +++ b/futures-util/src/stream/stream/mod.rs @@ -38,6 +38,10 @@ mod concat; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::concat::Concat; +mod count; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::count::Count; + mod cycle; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::cycle::Cycle; @@ -590,6 +594,38 @@ pub trait StreamExt: Stream { assert_future::(Concat::new(self)) } + /// Drives the stream to completion, counting the number of items. + /// + /// # Overflow Behavior + /// + /// The method does no guarding against overflows, so counting elements of a + /// stream with more than [`usize::MAX`] elements either produces the wrong + /// result or panics. If debug assertions are enabled, a panic is guaranteed. + /// + /// # Panics + /// + /// This function might panic if the iterator has more than [`usize::MAX`] + /// elements. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, StreamExt}; + /// + /// let stream = stream::iter(1..=10); + /// let count = stream.count().await; + /// + /// assert_eq!(count, 10); + /// # }); + /// ``` + fn count(self) -> Count + where + Self: Sized + { + assert_future::(Count::new(self)) + } + /// Repeats a stream endlessly. /// /// The stream never terminates. Note that you likely want to avoid