From 415ed37a3bf59740dbd77ac94d36c033042cc932 Mon Sep 17 00:00:00 2001 From: Thomas BESSOU Date: Fri, 23 Jul 2021 19:45:07 +0200 Subject: [PATCH] Backport try_chunks to 0.3 #2438 is marked as `0.3-backport: pending`. This is it. :) Resolves #1947 (which specifically mentionned 0.3) --- futures-util/src/stream/mod.rs | 3 + futures-util/src/stream/try_stream/mod.rs | 55 ++++++++ .../src/stream/try_stream/try_chunks.rs | 131 ++++++++++++++++++ 3 files changed, 189 insertions(+) create mode 100644 futures-util/src/stream/try_stream/try_chunks.rs diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index 0b2fc90532..fc9e6f36a9 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -62,6 +62,9 @@ pub use self::try_stream::IntoAsyncRead; #[cfg(feature = "alloc")] pub use self::try_stream::{TryBufferUnordered, TryBuffered, TryForEachConcurrent}; +#[cfg(feature = "alloc")] +pub use self::try_stream::{TryChunks, TryChunksError}; + // Primitive streams mod iter; diff --git a/futures-util/src/stream/try_stream/mod.rs b/futures-util/src/stream/try_stream/mod.rs index 11cd9c0d31..25f810e4c8 100644 --- a/futures-util/src/stream/try_stream/mod.rs +++ b/futures-util/src/stream/try_stream/mod.rs @@ -12,6 +12,8 @@ use crate::fns::{ use crate::future::assert_future; use crate::stream::assert_stream; use crate::stream::{Inspect, Map}; +#[cfg(feature = "alloc")] +use alloc::vec::Vec; use core::pin::Pin; use futures_core::{ future::{Future, TryFuture}, @@ -94,6 +96,12 @@ mod try_concat; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_concat::TryConcat; +#[cfg(feature = "alloc")] +mod try_chunks; +#[cfg(feature = "alloc")] +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::try_chunks::{TryChunks, TryChunksError}; + mod try_fold; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_fold::TryFold; @@ -572,6 +580,53 @@ pub trait TryStreamExt: TryStream { assert_future::, _>(TryCollect::new(self)) } + /// An adaptor for chunking up successful items of the stream inside a vector. + /// + /// This combinator will attempt to pull successful items from this stream and buffer + /// them into a local vector. At most `capacity` items will get buffered + /// before they're yielded from the returned stream. + /// + /// Note that the vectors returned from this iterator may not always have + /// `capacity` elements. If the underlying stream ended and only a partial + /// vector was created, it'll be returned. Additionally if an error happens + /// from the underlying stream then the currently buffered items will be + /// yielded. + /// + /// This method is only available when the `std` or `alloc` feature of this + /// library is activated, and it is activated by default. + /// + /// This function is similar to + /// [`StreamExt::chunks`](crate::stream::StreamExt::chunks) but exits + /// early if an error occurs. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, TryChunksError, TryStreamExt}; + /// + /// let stream = stream::iter(vec![Ok::(1), Ok(2), Ok(3), Err(4), Ok(5), Ok(6)]); + /// let mut stream = stream.try_chunks(2); + /// + /// assert_eq!(stream.try_next().await, Ok(Some(vec![1, 2]))); + /// assert_eq!(stream.try_next().await, Err(TryChunksError(vec![3], 4))); + /// assert_eq!(stream.try_next().await, Ok(Some(vec![5, 6]))); + /// # }) + /// ``` + /// + /// # Panics + /// + /// This method will panic if `capacity` is zero. + #[cfg(feature = "alloc")] + fn try_chunks(self, capacity: usize) -> TryChunks + where + Self: Sized, + { + assert_stream::, TryChunksError>, _>( + TryChunks::new(self, capacity), + ) + } + /// Attempt to filter the values produced by this stream according to the /// provided asynchronous closure. /// diff --git a/futures-util/src/stream/try_stream/try_chunks.rs b/futures-util/src/stream/try_stream/try_chunks.rs new file mode 100644 index 0000000000..ac10df2114 --- /dev/null +++ b/futures-util/src/stream/try_stream/try_chunks.rs @@ -0,0 +1,131 @@ +use crate::stream::{Fuse, IntoStream, StreamExt}; + +use alloc::vec::Vec; +use core::pin::Pin; +use core::{fmt, mem}; +use futures_core::ready; +use futures_core::stream::{FusedStream, Stream, TryStream}; +use futures_core::task::{Context, Poll}; +#[cfg(feature = "sink")] +use futures_sink::Sink; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`try_chunks`](super::TryStreamExt::try_chunks) method. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct TryChunks { + #[pin] + stream: Fuse>, + items: Vec, + cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 + } +} + +impl TryChunks { + pub(super) fn new(stream: St, capacity: usize) -> Self { + assert!(capacity > 0); + + Self { + stream: IntoStream::new(stream).fuse(), + items: Vec::with_capacity(capacity), + cap: capacity, + } + } + + fn take(self: Pin<&mut Self>) -> Vec { + let cap = self.cap; + mem::replace(self.project().items, Vec::with_capacity(cap)) + } + + delegate_access_inner!(stream, St, (. .)); +} + +impl Stream for TryChunks { + #[allow(clippy::type_complexity)] + type Item = Result, TryChunksError>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.as_mut().project(); + loop { + match ready!(this.stream.as_mut().try_poll_next(cx)) { + // Push the item into the buffer and check whether it is full. + // If so, replace our buffer with a new and empty one and return + // the full one. + Some(item) => match item { + Ok(item) => { + this.items.push(item); + if this.items.len() >= *this.cap { + return Poll::Ready(Some(Ok(self.take()))); + } + } + Err(e) => { + return Poll::Ready(Some(Err(TryChunksError(self.take(), e)))); + } + }, + + // Since the underlying stream ran out of values, return what we + // have buffered, if we have anything. + None => { + let last = if this.items.is_empty() { + None + } else { + let full_buf = mem::replace(this.items, Vec::new()); + Some(full_buf) + }; + + return Poll::Ready(last.map(Ok)); + } + } + } + } + + fn size_hint(&self) -> (usize, Option) { + let chunk_len = if self.items.is_empty() { 0 } else { 1 }; + let (lower, upper) = self.stream.size_hint(); + let lower = lower.saturating_add(chunk_len); + let upper = match upper { + Some(x) => x.checked_add(chunk_len), + None => None, + }; + (lower, upper) + } +} + +impl FusedStream for TryChunks { + fn is_terminated(&self) -> bool { + self.stream.is_terminated() && self.items.is_empty() + } +} + +// Forwarding impl of Sink from the underlying stream +#[cfg(feature = "sink")] +impl Sink for TryChunks +where + S: TryStream + Sink, +{ + type Error = >::Error; + + delegate_sink!(stream, Item); +} + +/// Error indicating, that while chunk was collected inner stream produced an error. +/// +/// Contains all items that were collected before an error occured, and the stream error itself. +#[derive(PartialEq, Eq)] +pub struct TryChunksError(pub Vec, pub E); + +impl fmt::Debug for TryChunksError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.1.fmt(f) + } +} + +impl fmt::Display for TryChunksError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.1.fmt(f) + } +} + +#[cfg(feature = "std")] +impl std::error::Error for TryChunksError {}