Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: add StreamExt::chunks_timeout #4695

Merged
merged 13 commits into from Jun 6, 2022
53 changes: 53 additions & 0 deletions tokio-stream/src/stream_ext.rs
Expand Up @@ -61,6 +61,8 @@ cfg_time! {
use tokio::time::Duration;
mod throttle;
use throttle::{throttle, Throttle};
mod chunks_timeout;
use chunks_timeout::ChunksTimeout;
}

/// An extension trait for the [`Stream`] trait that provides a variety of
Expand Down Expand Up @@ -1005,6 +1007,57 @@ pub trait StreamExt: Stream {
{
throttle(duration, self)
}



jefftt marked this conversation as resolved.
Show resolved Hide resolved
/// Batches the items in the given stream using a maximum duration and size for each batch.
///
/// This stream returns the next batch of items in the following situations:
/// 1. The inner stream has returned at least `max_size` many items since the last batch.
/// 2. The time since the first element of a batch is greater than the given duration.
/// 3. The end of the stream is reached
jefftt marked this conversation as resolved.
Show resolved Hide resolved
///
/// The length of the returned vector is never empty or greater than capacity. Empty batches
jefftt marked this conversation as resolved.
Show resolved Hide resolved
/// will not be emitted if no items are received upstream.
///
/// # Example
///
/// ```rust,no_run
/// use std::time::Duration;
/// use tokio::time;
/// use tokio_stream::{self as stream, StreamExt};
/// use futures::FutureExt;
///
/// #[tokio::main]
/// async fn main() {
jefftt marked this conversation as resolved.
Show resolved Hide resolved
/// let iter = vec![1, 2, 3, 4].into_iter();
/// let stream0 = stream::iter(iter);
///
/// let iter = vec![5].into_iter();
/// let stream1 = stream::iter(iter)
/// .then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n));
///
/// let chunk_stream = stream0
/// .chain(stream1)
/// .chunks_timeout(3, Duration::from_secs(2));
/// tokio::pin!(chunk_stream);
///
/// // a full batch was received
/// assert_eq!(chunk_stream.next().await, Some(vec![1,2,3]));
/// // deadline was reached before max_size was reached
/// assert_eq!(chunk_stream.next().await, Some(vec![4]));
/// // last element in the stream
/// assert_eq!(chunk_stream.next().await, Some(vec![5]));
/// }
/// ```
#[cfg(feature = "time")]
#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
fn chunks_timeout(self, max_size: usize, duration: Duration) -> ChunksTimeout<Self>
where
Self: Sized,
{
ChunksTimeout::new(self, max_size, duration)
}
jefftt marked this conversation as resolved.
Show resolved Hide resolved
}

impl<St: ?Sized> StreamExt for St where St: Stream {}
Expand Down
94 changes: 94 additions & 0 deletions tokio-stream/src/stream_ext/chunks_timeout.rs
@@ -0,0 +1,94 @@
use crate::stream_ext::Fuse;
use crate::Stream;
use tokio::time::{sleep, Instant, Sleep};

use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use pin_project_lite::pin_project;
use std::time::Duration;

pin_project! {
/// Stream returned by the [`chunks_timeout`](super::StreamExt::chunks_timeout) method.
#[must_use = "streams do nothing unless polled"]
#[derive(Debug)]
pub struct ChunksTimeout<S: Stream> {
#[pin]
stream: Fuse<S>,
#[pin]
deadline: Sleep,
poll_deadline: bool,
jefftt marked this conversation as resolved.
Show resolved Hide resolved
duration: Duration,
items: Vec<S::Item>,
cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
}
}

impl<S: Stream> ChunksTimeout<S> {
pub(super) fn new(stream: S, max_size: usize, duration: Duration) -> Self {
ChunksTimeout {
stream: Fuse::new(stream),
deadline: sleep(duration),
poll_deadline: false,
duration,
items: Vec::with_capacity(max_size),
cap: max_size,
}
}

fn take(mut self: Pin<&mut Self>) -> Vec<S::Item> {
let cap = self.cap;
let this = self.as_mut().project();
*this.poll_deadline = false;

std::mem::replace(this.items, Vec::with_capacity(cap))
}
}

impl<S: Stream> Stream for ChunksTimeout<S> {
type Item = Vec<S::Item>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut me = self.as_mut().project();
loop {
match me.stream.as_mut().poll_next(cx) {
Poll::Pending => break,
Poll::Ready(Some(item)) => {
if me.items.is_empty() {
me.deadline.as_mut().reset(Instant::now() + *me.duration);
*me.poll_deadline = true;
}
jefftt marked this conversation as resolved.
Show resolved Hide resolved
me.items.push(item);
if me.items.len() >= *me.cap {
return Poll::Ready(Some(self.take()));
}
}
Poll::Ready(None) => {
let last = if me.items.is_empty() {
None
} else {
let full_buf = std::mem::take(me.items);
Some(full_buf)
};
jefftt marked this conversation as resolved.
Show resolved Hide resolved

return Poll::Ready(last);
}
}
}

if *me.poll_deadline {
ready!(me.deadline.poll(cx));
return Poll::Ready(Some(self.take()));
}

Poll::Pending
}

fn size_hint(&self) -> (usize, Option<usize>) {
let chunk_len = if self.items.is_empty() { 0 } else { 1 };
let (lower, upper) = self.stream.size_hint();
let lower = lower.saturating_add(chunk_len);
let upper = upper.and_then(|x| x.checked_add(chunk_len));
(lower, upper)
}
jefftt marked this conversation as resolved.
Show resolved Hide resolved
}
84 changes: 84 additions & 0 deletions tokio-stream/tests/chunks_timeout.rs
@@ -0,0 +1,84 @@
#![warn(rust_2018_idioms)]
#![cfg(all(feature = "time", feature = "sync", feature = "io-util"))]

use tokio::time;
use tokio_stream::{self as stream, StreamExt};
use tokio_test::assert_pending;
use tokio_test::task;

use futures::FutureExt;
use std::time::Duration;

#[tokio::test(start_paused = true)]
async fn usage() {
let iter = vec![1, 2, 3].into_iter();
let stream0 = stream::iter(iter);
Darksonn marked this conversation as resolved.
Show resolved Hide resolved

let iter = vec![4].into_iter();
let stream1 =
stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(3)).map(move |_| n));

let chunk_stream = stream0
.chain(stream1)
.chunks_timeout(4, Duration::from_secs(2));

let mut chunk_stream = task::spawn(chunk_stream);

assert_pending!(chunk_stream.poll_next());
time::advance(Duration::from_secs(2)).await;
assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3]));

assert_pending!(chunk_stream.poll_next());
time::advance(Duration::from_secs(2)).await;
assert_eq!(chunk_stream.next().await, Some(vec![4]));
}

#[tokio::test(start_paused = true)]
async fn full_chunk_with_timeout() {
let iter = vec![1, 2].into_iter();
let stream0 = stream::iter(iter);

let iter = vec![3].into_iter();
let stream1 =
stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(1)).map(move |_| n));

let iter = vec![4].into_iter();
let stream2 =
stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(3)).map(move |_| n));

let chunk_stream = stream0
.chain(stream1)
.chain(stream2)
.chunks_timeout(3, Duration::from_secs(2));

let mut chunk_stream = task::spawn(chunk_stream);

assert_pending!(chunk_stream.poll_next());
time::advance(Duration::from_secs(2)).await;
assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3]));

assert_pending!(chunk_stream.poll_next());
time::advance(Duration::from_secs(2)).await;
assert_eq!(chunk_stream.next().await, Some(vec![4]));
}

#[tokio::test]
#[ignore]
async fn real_time() {
let iter = vec![1, 2, 3, 4].into_iter();
let stream0 = stream::iter(iter);

let iter = vec![5].into_iter();
let stream1 =
stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n));

let chunk_stream = stream0
.chain(stream1)
.chunks_timeout(3, Duration::from_secs(2));

let mut chunk_stream = task::spawn(chunk_stream);

assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3]));
assert_eq!(chunk_stream.next().await, Some(vec![4]));
assert_eq!(chunk_stream.next().await, Some(vec![5]));
}