Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce ready_chunks adaptor #2123

Merged
merged 1 commit into from Apr 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions futures-util/src/stream/mod.rs
Expand Up @@ -24,6 +24,9 @@ pub use self::stream::CatchUnwind;
#[cfg(feature = "alloc")]
pub use self::stream::Chunks;

#[cfg(feature = "alloc")]
pub use self::stream::ReadyChunks;

#[cfg(feature = "sink")]
pub use self::stream::Forward;

Expand Down
32 changes: 32 additions & 0 deletions futures-util/src/stream/stream/mod.rs
Expand Up @@ -128,6 +128,12 @@ mod chunks;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::chunks::Chunks;

#[cfg(feature = "alloc")]
mod ready_chunks;
#[cfg(feature = "alloc")]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::ready_chunks::ReadyChunks;

mod scan;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::scan::Scan;
Expand Down Expand Up @@ -1186,6 +1192,32 @@ pub trait StreamExt: Stream {
Chunks::new(self, capacity)
}

/// An adaptor for chunking up ready items of the stream inside a vector.
///
/// This combinator will attempt to pull ready items from this stream and
/// buffer them into a local vector. At most `capacity` items will get
/// buffered before they're yielded from the returned stream. If underlying
/// stream returns `Poll::Pending`, and collected chunk is not empty, it will
/// be immediately returned.
///
/// If the underlying stream ended and only a partial vector was created,
/// it'll be returned. Additionally if an error happens from the underlying
/// stream then the currently buffered items will be yielded.
///
/// This method is only available when the `std` or `alloc` feature of this
/// library is activated, and it is activated by default.
///
/// # Panics
///
/// This method will panic if `capacity` is zero.
#[cfg(feature = "alloc")]
fn ready_chunks(self, capacity: usize) -> ReadyChunks<Self>
where
Self: Sized,
{
ReadyChunks::new(self, capacity)
}

/// A future that completes after the given stream has been fully processed
/// into the sink and the sink has been flushed and closed.
///
Expand Down
146 changes: 146 additions & 0 deletions futures-util/src/stream/stream/ready_chunks.rs
@@ -0,0 +1,146 @@
use crate::stream::Fuse;
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
use pin_utils::{unsafe_pinned, unsafe_unpinned};
use core::mem;
use core::pin::Pin;
use alloc::vec::Vec;

/// Stream for the [`ready_chunks`](super::StreamExt::ready_chunks) method.
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct ReadyChunks<St: Stream> {
stream: Fuse<St>,
items: Vec<St::Item>,
cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
}

impl<St: Unpin + Stream> Unpin for ReadyChunks<St> {}

impl<St: Stream> ReadyChunks<St> where St: Stream {
unsafe_unpinned!(items: Vec<St::Item>);
unsafe_pinned!(stream: Fuse<St>);

pub(super) fn new(stream: St, capacity: usize) -> ReadyChunks<St> {
assert!(capacity > 0);

ReadyChunks {
stream: super::Fuse::new(stream),
items: Vec::with_capacity(capacity),
cap: capacity,
}
}

fn take(mut self: Pin<&mut Self>) -> Vec<St::Item> {
let cap = self.cap;
mem::replace(self.as_mut().items(), Vec::with_capacity(cap))
}

/// Acquires a reference to the underlying stream that this combinator is
/// pulling from.
pub fn get_ref(&self) -> &St {
self.stream.get_ref()
}

/// Acquires a mutable reference to the underlying stream that this
/// combinator is pulling from.
///
/// Note that care must be taken to avoid tampering with the state of the
/// stream which may otherwise confuse this combinator.
pub fn get_mut(&mut self) -> &mut St {
self.stream.get_mut()
}

/// Acquires a pinned mutable reference to the underlying stream that this
/// combinator is pulling from.
///
/// Note that care must be taken to avoid tampering with the state of the
/// stream which may otherwise confuse this combinator.
pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
self.stream().get_pin_mut()
}

/// Consumes this combinator, returning the underlying stream.
///
/// Note that this may discard intermediate state of this combinator, so
/// care should be taken to avoid losing resources when this is called.
pub fn into_inner(self) -> St {
self.stream.into_inner()
}
}

impl<St: Stream> Stream for ReadyChunks<St> {
type Item = Vec<St::Item>;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
loop {
match self.as_mut().stream().poll_next(cx) {
// Flush all collected data if underlying stream doesn't contain
// more ready values
Poll::Pending => {
return if self.items.is_empty() {
Poll::Pending
} else {
Poll::Ready(Some(self.as_mut().take()))
}
}

// Push the ready item into the buffer and check whether it is full.
// If so, replace our buffer with a new and empty one and return
// the full one.
Poll::Ready(Some(item)) => {
self.as_mut().items().push(item);
if self.items.len() >= self.cap {
return Poll::Ready(Some(self.as_mut().take()))
}
}

// Since the underlying stream ran out of values, return what we
// have buffered, if we have anything.
Poll::Ready(None) => {
let last = if self.items.is_empty() {
None
} else {
let full_buf = mem::replace(self.as_mut().items(), Vec::new());
Some(full_buf)
};

return Poll::Ready(last);
}
}
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
let chunk_len = if self.items.is_empty() { 0 } else { 1 };
let (lower, upper) = self.stream.size_hint();
let lower = lower.saturating_add(chunk_len);
let upper = match upper {
Some(x) => x.checked_add(chunk_len),
None => None,
};
(lower, upper)
}
}

impl<St: FusedStream> FusedStream for ReadyChunks<St> {
fn is_terminated(&self) -> bool {
self.stream.is_terminated() && self.items.is_empty()
}
}

// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<S, Item> Sink<Item> for ReadyChunks<S>
where
S: Stream + Sink<Item>,
{
type Error = S::Error;

delegate_sink!(stream, Item);
}
2 changes: 1 addition & 1 deletion futures/src/lib.rs
Expand Up @@ -458,7 +458,7 @@ pub mod stream {
#[cfg(feature = "alloc")]
pub use futures_util::stream::{
// For StreamExt:
Chunks,
Chunks, ReadyChunks,
};

#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
Expand Down
39 changes: 39 additions & 0 deletions futures/tests/stream.rs
Expand Up @@ -124,3 +124,42 @@ fn take_until() {
assert_eq!(stream.next().await, None);
});
}

#[test]
#[should_panic]
fn ready_chunks_panic_on_cap_zero() {
use futures::channel::mpsc;
use futures::stream::StreamExt;

let (_, rx1) = mpsc::channel::<()>(1);

let _ = rx1.ready_chunks(0);
}

#[cfg(feature = "executor")] // executor::
#[test]
fn ready_chunks() {
use futures::channel::mpsc;
use futures::stream::StreamExt;
use futures::sink::SinkExt;
use futures::FutureExt;
use futures_test::task::noop_context;

let (mut tx, rx1) = mpsc::channel::<i32>(16);

let mut s = rx1.ready_chunks(2);

let mut cx = noop_context();
assert!(s.next().poll_unpin(&mut cx).is_pending());

futures::executor::block_on(async {
tx.send(1).await.unwrap();

assert_eq!(s.next().await.unwrap(), vec![1]);
tx.send(2).await.unwrap();
tx.send(3).await.unwrap();
tx.send(4).await.unwrap();
assert_eq!(s.next().await.unwrap(), vec![2,3]);
assert_eq!(s.next().await.unwrap(), vec![4]);
});
}