Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: add StreamExt::chunks_timeout #4695

Merged
merged 13 commits into from Jun 6, 2022
41 changes: 41 additions & 0 deletions tokio-stream/src/stream_ext.rs
Expand Up @@ -61,6 +61,8 @@ cfg_time! {
use tokio::time::Duration;
mod throttle;
use throttle::{throttle, Throttle};
mod chunks_timeout;
use chunks_timeout::ChunksTimeout;
}

/// An extension trait for the [`Stream`] trait that provides a variety of
Expand Down Expand Up @@ -993,6 +995,45 @@ pub trait StreamExt: Stream {
{
throttle(duration, self)
}

/// Collects items into batches inside a vector within a deadline.
///
/// `chunks_timeout` attempts to yield a vector of len `capacity` within the deadline,
/// otherwise will yield a vector of len less than `capacity` if the deadline is reached
jefftt marked this conversation as resolved.
Show resolved Hide resolved
///
/// # Example
///
/// ```rust,no_run
/// use std::time::Duration;
/// use tokio::time;
/// use tokio_stream::{self as stream, StreamExt};
/// use futures::FutureExt;
///
/// #[tokio::main]
/// async fn main() {
jefftt marked this conversation as resolved.
Show resolved Hide resolved
/// let iter = vec![1, 2, 3].into_iter();
/// let stream0 = stream::iter(iter);
///
/// let iter = vec![4].into_iter();
/// let stream1 = stream::iter(iter)
/// .then(move |n| time::sleep(Duration::from_secs(2)).map(move |_| n));
///
/// let chunk_stream = stream0
/// .chain(stream1)
/// .chunks_timeout(4, Duration::from_secs(1));
///
/// assert_eq!(chunk_stream.next().await, Some(vec![1,2,3]));
/// assert_eq!(chunk_stream.next().await, Some(vec![4]));
/// }
/// ```
#[cfg(feature = "time")]
#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
fn chunks_timeout(self, capacity: usize, duration: Duration) -> ChunksTimeout<Self>
where
Self: Sized,
{
ChunksTimeout::new(self, capacity, duration)
}
}

impl<St: ?Sized> StreamExt for St where St: Stream {}
Expand Down
89 changes: 89 additions & 0 deletions tokio-stream/src/stream_ext/chunks_timeout.rs
@@ -0,0 +1,89 @@
use crate::stream_ext::Fuse;
use crate::Stream;
use tokio::time::Sleep;

use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use pin_project_lite::pin_project;
use std::time::Duration;

pin_project! {
/// Stream returned by the [`chunks_timeout`](super::StreamExt::chunks_timeout) method.
#[must_use = "streams do nothing unless polled"]
#[derive(Debug)]
pub struct ChunksTimeout<S: Stream> {
#[pin]
stream: Fuse<S>,
#[pin]
deadline: Sleep,
duration: Duration,
items: Vec<S::Item>,
cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
}
}

impl<S: Stream> ChunksTimeout<S> {
pub(super) fn new(stream: S, capacity: usize, duration: Duration) -> Self {
ChunksTimeout {
stream: Fuse::new(stream),
deadline: tokio::time::sleep(duration),
duration,
items: Vec::with_capacity(capacity),
cap: capacity,
}
}

fn take(mut self: Pin<&mut Self>) -> Vec<S::Item> {
let duration = self.duration;
let cap = self.cap;
let this = self.as_mut().project();
this.deadline.reset(tokio::time::Instant::now() + duration);

std::mem::replace(this.items, Vec::with_capacity(cap))
}
}

impl<S: Stream> Stream for ChunksTimeout<S> {
type Item = Vec<S::Item>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut me = self.as_mut().project();
loop {
match me.stream.as_mut().poll_next(cx) {
Poll::Pending => break,
Poll::Ready(Some(item)) => {
me.items.push(item);
if me.items.len() >= *me.cap {
return Poll::Ready(Some(self.take()));
}
}
Poll::Ready(None) => {
let last = if me.items.is_empty() {
None
} else {
let full_buf = std::mem::take(me.items);
Some(full_buf)
};
jefftt marked this conversation as resolved.
Show resolved Hide resolved

return Poll::Ready(last);
}
}
}

match me.deadline.as_mut().poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(()) => {
return Poll::Ready(Some(self.take()));
jefftt marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
let chunk_len = if self.items.is_empty() { 0 } else { 1 };
let (lower, upper) = self.stream.size_hint();
let lower = lower.saturating_add(chunk_len);
let upper = upper.and_then(|x| x.checked_add(chunk_len));
(lower, upper)
}
jefftt marked this conversation as resolved.
Show resolved Hide resolved
}
53 changes: 53 additions & 0 deletions tokio-stream/tests/chunks_timeout.rs
@@ -0,0 +1,53 @@
#![warn(rust_2018_idioms)]
#![cfg(all(feature = "time", feature = "sync", feature = "io-util"))]

use tokio::time;
use tokio_stream::{self as stream, StreamExt};
use tokio_test::assert_pending;
use tokio_test::task;

use futures::FutureExt;
use std::time::Duration;

#[tokio::test(start_paused = true)]
async fn usage() {
let iter = vec![1, 2, 3].into_iter();
let stream0 = stream::iter(iter);
Darksonn marked this conversation as resolved.
Show resolved Hide resolved

let iter = vec![4].into_iter();
let stream1 =
stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(3)).map(move |_| n));

let chunk_stream = stream0
.chain(stream1)
.chunks_timeout(4, Duration::from_secs(2));

let mut chunk_stream = task::spawn(chunk_stream);

assert_pending!(chunk_stream.poll_next());
time::advance(Duration::from_secs(2)).await;
assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3]));

time::advance(Duration::from_secs(2)).await;
assert_eq!(chunk_stream.next().await, Some(vec![4]));
jefftt marked this conversation as resolved.
Show resolved Hide resolved
}

#[tokio::test]
#[ignore]
async fn real_time() {
let iter = vec![1, 2, 3].into_iter();
let stream0 = stream::iter(iter);

let iter = vec![4].into_iter();
let stream1 =
stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(3)).map(move |_| n));

let chunk_stream = stream0
.chain(stream1)
.chunks_timeout(4, Duration::from_secs(2));

let mut chunk_stream = task::spawn(chunk_stream);

assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3]));
assert_eq!(chunk_stream.next().await, Some(vec![4]));
}