Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: add StreamExt::chunks_timeout #4695

Merged
merged 13 commits into from Jun 6, 2022
43 changes: 43 additions & 0 deletions tokio-stream/src/stream_ext.rs
Expand Up @@ -61,6 +61,8 @@ cfg_time! {
use tokio::time::Duration;
mod throttle;
use throttle::{throttle, Throttle};
mod chunks_timeout;
use chunks_timeout::ChunksTimeout;
}

/// An extension trait for the [`Stream`] trait that provides a variety of
Expand Down Expand Up @@ -993,6 +995,47 @@ pub trait StreamExt: Stream {
{
throttle(duration, self)
}

/// Collects items into batches inside a vector within a deadline.
///
/// `chunks_timeout` attempts to yield a vector of len `capacity` within the deadline,
/// otherwise will yield a vector of len less than `capacity` if the deadline is reached
jefftt marked this conversation as resolved.
Show resolved Hide resolved
///
/// # Example
///
/// ```
/// use std::time::Duration;
/// use tokio::time;
/// use tokio_stream::{self as stream, StreamExt};
/// use futures::FutureExt;
///
/// #[tokio::main]
/// async fn main() {
jefftt marked this conversation as resolved.
Show resolved Hide resolved
/// let iter = vec![1, 2, 3].into_iter();
/// let stream0 = stream::iter(iter);
///
/// let iter = vec![4].into_iter();
/// let stream1 = stream::iter(iter)
/// .then(move |n| time::sleep(Duration::from_secs(2)).map(move |_| n));
///
/// let chunk_stream = stream0
/// .chain(stream1)
/// .chunks_timeout(4, Duration::from_secs(1));
///
/// tokio::pin!(chunk_stream);
///
/// assert_eq!(chunk_stream.next().await, Some(vec![1,2,3]));
/// assert_eq!(chunk_stream.next().await, Some(vec![4]));
/// }
/// ```
#[cfg(all(feature = "time"))]
jefftt marked this conversation as resolved.
Show resolved Hide resolved
#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
fn chunks_timeout(self, capacity: usize, duration: Duration) -> ChunksTimeout<Self>
where
Self: Sized,
{
ChunksTimeout::new(self, capacity, duration)
}
}

impl<St: ?Sized> StreamExt for St where St: Stream {}
Expand Down
93 changes: 93 additions & 0 deletions tokio-stream/src/stream_ext/chunks_timeout.rs
@@ -0,0 +1,93 @@
use crate::stream_ext::Fuse;
use crate::Stream;
use tokio::time::Sleep;

use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use pin_project_lite::pin_project;
use std::time::Duration;

pin_project! {
/// Stream returned by the [`chunks_timeout`](super::StreamExt::chunks_timeout) method.
#[must_use = "streams do nothing unless polled"]
#[derive(Debug)]
pub struct ChunksTimeout<S: Stream> {
#[pin]
stream: Fuse<S>,
#[pin]
deadline: Sleep,
duration: Duration,
items: Vec<S::Item>,
cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
}
}

impl<S: Stream> ChunksTimeout<S> {
pub(super) fn new(stream: S, capacity: usize, duration: Duration) -> Self {
ChunksTimeout {
stream: Fuse::new(stream),
deadline: tokio::time::sleep(duration),
duration,
items: Vec::with_capacity(capacity),
cap: capacity,
}
}

fn take(mut self: Pin<&mut Self>) -> Vec<S::Item> {
let duration = self.duration;
let cap = self.cap;
let this = self.as_mut().project();
this.deadline.reset(tokio::time::Instant::now() + duration);

std::mem::replace(this.items, Vec::with_capacity(cap))
}
}

impl<S: Stream> Stream for ChunksTimeout<S> {
type Item = Vec<S::Item>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut me = self.as_mut().project();
loop {
match me.stream.as_mut().poll_next(cx) {
Poll::Pending => {}
Poll::Ready(Some(item)) => {
me.items.push(item);
if me.items.len() >= *me.cap {
return Poll::Ready(Some(self.take()));
}
}

Poll::Ready(None) => {
let last = if me.items.is_empty() {
None
} else {
let full_buf = std::mem::take(me.items);
Some(full_buf)
};
jefftt marked this conversation as resolved.
Show resolved Hide resolved

return Poll::Ready(last);
}
}

match me.deadline.as_mut().poll(cx) {
Poll::Pending => {}
Poll::Ready(()) => {
return Poll::Ready(Some(self.take()));
}
}
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
let chunk_len = if self.items.is_empty() { 0 } else { 1 };
let (lower, upper) = self.stream.size_hint();
let lower = lower.saturating_add(chunk_len);
let upper = match upper {
Some(x) => x.checked_add(chunk_len),
None => None,
};
jefftt marked this conversation as resolved.
Show resolved Hide resolved
(lower, upper)
}
}
27 changes: 27 additions & 0 deletions tokio-stream/tests/chunks_timeout.rs
@@ -0,0 +1,27 @@
#![warn(rust_2018_idioms)]
#![cfg(all(feature = "time", feature = "sync", feature = "io-util"))]

use tokio::time;
use tokio_stream::{self as stream, StreamExt};

use futures::FutureExt;
use std::time::Duration;

#[tokio::test(flavor = "multi_thread")]
async fn usage() {
jefftt marked this conversation as resolved.
Show resolved Hide resolved
let iter = vec![1, 2, 3].into_iter();
let stream0 = stream::iter(iter);

let iter = vec![4].into_iter();
let stream1 = stream::iter(iter)
.then(move |n| time::sleep(Duration::from_secs(2)).map(move |_| n));

let chunk_stream = stream0
.chain(stream1)
.chunks_timeout(4, Duration::from_secs(1));
jefftt marked this conversation as resolved.
Show resolved Hide resolved

tokio::pin!(chunk_stream);

assert_eq!(chunk_stream.next().await, Some(vec![1,2,3]));
assert_eq!(chunk_stream.next().await, Some(vec![4]));
}