Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement try_chunks #2438

Merged
merged 2 commits into from May 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions futures-util/src/stream/mod.rs
Expand Up @@ -63,6 +63,9 @@ pub use self::try_stream::IntoAsyncRead;
#[cfg(feature = "alloc")]
pub use self::try_stream::{TryBufferUnordered, TryBuffered};

#[cfg(feature = "alloc")]
pub use self::try_stream::{TryChunks, TryChunksError};

// Primitive streams

mod iter;
Expand Down
55 changes: 55 additions & 0 deletions futures-util/src/stream/try_stream/mod.rs
Expand Up @@ -12,6 +12,8 @@ use crate::fns::{
use crate::future::assert_future;
use crate::stream::assert_stream;
use crate::stream::{Inspect, Map};
#[cfg(feature = "alloc")]
use alloc::vec::Vec;
use core::pin::Pin;
use futures_core::{
future::{Future, TryFuture},
Expand Down Expand Up @@ -90,6 +92,12 @@ mod try_concat;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::try_concat::TryConcat;

#[cfg(feature = "alloc")]
mod try_chunks;
#[cfg(feature = "alloc")]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::try_chunks::{TryChunks, TryChunksError};

mod try_unfold;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::try_unfold::{try_unfold, TryUnfold};
Expand Down Expand Up @@ -456,6 +464,53 @@ pub trait TryStreamExt: TryStream {
assert_future::<Result<C, Self::Error>, _>(TryCollect::new(self))
}

/// An adaptor for chunking up successful items of the stream inside a vector.
///
/// This combinator will attempt to pull successful items from this stream and buffer
/// them into a local vector. At most `capacity` items will get buffered
/// before they're yielded from the returned stream.
///
/// Note that the vectors returned from this iterator may not always have
/// `capacity` elements. If the underlying stream ended and only a partial
/// vector was created, it'll be returned. Additionally if an error happens
/// from the underlying stream then the currently buffered items will be
/// yielded.
///
/// This method is only available when the `std` or `alloc` feature of this
/// library is activated, and it is activated by default.
///
/// This function is similar to
/// [`StreamExt::chunks`](crate::stream::StreamExt::chunks) but exits
/// early if an error occurs.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::stream::{self, TryChunksError, TryStreamExt};
///
/// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Err(4), Ok(5), Ok(6)]);
/// let mut stream = stream.try_chunks(2);
///
/// assert_eq!(stream.try_next().await, Ok(Some(vec![1, 2])));
/// assert_eq!(stream.try_next().await, Err(TryChunksError(vec![3], 4)));
/// assert_eq!(stream.try_next().await, Ok(Some(vec![5, 6])));
/// # })
/// ```
///
/// # Panics
///
/// This method will panic if `capacity` is zero.
#[cfg(feature = "alloc")]
fn try_chunks(self, capacity: usize) -> TryChunks<Self>
where
Self: Sized,
{
assert_stream::<Result<Vec<Self::Ok>, TryChunksError<Self::Ok, Self::Error>>, _>(
TryChunks::new(self, capacity),
)
}

/// Attempt to filter the values produced by this stream according to the
/// provided asynchronous closure.
///
Expand Down
131 changes: 131 additions & 0 deletions futures-util/src/stream/try_stream/try_chunks.rs
@@ -0,0 +1,131 @@
use crate::stream::{Fuse, IntoStream, StreamExt};

use alloc::vec::Vec;
use core::pin::Pin;
use core::{fmt, mem};
use futures_core::ready;
use futures_core::stream::{FusedStream, Stream, TryStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
use pin_project_lite::pin_project;

pin_project! {
/// Stream for the [`try_chunks`](super::TryStreamExt::try_chunks) method.
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct TryChunks<St: TryStream> {
#[pin]
stream: Fuse<IntoStream<St>>,
items: Vec<St::Ok>,
cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
}
}

impl<St: TryStream> TryChunks<St> {
pub(super) fn new(stream: St, capacity: usize) -> Self {
assert!(capacity > 0);

Self {
stream: IntoStream::new(stream).fuse(),
items: Vec::with_capacity(capacity),
cap: capacity,
}
}

fn take(self: Pin<&mut Self>) -> Vec<St::Ok> {
let cap = self.cap;
mem::replace(self.project().items, Vec::with_capacity(cap))
}

delegate_access_inner!(stream, St, (. .));
}

impl<St: TryStream> Stream for TryChunks<St> {
#[allow(clippy::type_complexity)]
type Item = Result<Vec<St::Ok>, TryChunksError<St::Ok, St::Error>>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.as_mut().project();
loop {
match ready!(this.stream.as_mut().try_poll_next(cx)) {
// Push the item into the buffer and check whether it is full.
// If so, replace our buffer with a new and empty one and return
// the full one.
Some(item) => match item {
Ok(item) => {
this.items.push(item);
if this.items.len() >= *this.cap {
return Poll::Ready(Some(Ok(self.take())));
}
}
Err(e) => {
return Poll::Ready(Some(Err(TryChunksError(self.take(), e))));
}
},

// Since the underlying stream ran out of values, return what we
// have buffered, if we have anything.
None => {
let last = if this.items.is_empty() {
None
} else {
let full_buf = mem::replace(this.items, Vec::new());
Some(full_buf)
};

return Poll::Ready(last.map(Ok));
}
}
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
let chunk_len = if self.items.is_empty() { 0 } else { 1 };
let (lower, upper) = self.stream.size_hint();
let lower = lower.saturating_add(chunk_len);
let upper = match upper {
Some(x) => x.checked_add(chunk_len),
None => None,
};
(lower, upper)
}
}

impl<St: TryStream + FusedStream> FusedStream for TryChunks<St> {
fn is_terminated(&self) -> bool {
self.stream.is_terminated() && self.items.is_empty()
}
}

// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<S, Item> Sink<Item> for TryChunks<S>
where
S: TryStream + Sink<Item>,
{
type Error = <S as Sink<Item>>::Error;

delegate_sink!(stream, Item);
}

/// Error indicating, that while chunk was collected inner stream produced an error.
///
/// Contains all items that were collected before an error occured, and the stream error itself.
#[derive(PartialEq, Eq)]
pub struct TryChunksError<T, E>(pub Vec<T>, pub E);

impl<T, E: fmt::Debug> fmt::Debug for TryChunksError<T, E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.1.fmt(f)
}
}

impl<T, E: fmt::Display> fmt::Display for TryChunksError<T, E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.1.fmt(f)
}
}

#[cfg(feature = "std")]
impl<T, E: fmt::Debug + fmt::Display> std::error::Error for TryChunksError<T, E> {}