From 99a242914302b94807fffb2e7bcc3e4832334d66 Mon Sep 17 00:00:00 2001 From: Jeff Lai Date: Sun, 15 May 2022 18:41:29 -0400 Subject: [PATCH 01/12] stream: add StreamExt#chunks_timeout --- tokio-stream/src/stream_ext.rs | 43 +++++++++ tokio-stream/src/stream_ext/chunks_timeout.rs | 93 +++++++++++++++++++ tokio-stream/tests/chunks_timeout.rs | 27 ++++++ 3 files changed, 163 insertions(+) create mode 100644 tokio-stream/src/stream_ext/chunks_timeout.rs create mode 100644 tokio-stream/tests/chunks_timeout.rs diff --git a/tokio-stream/src/stream_ext.rs b/tokio-stream/src/stream_ext.rs index a7bc2044f95..4d15a0bc843 100644 --- a/tokio-stream/src/stream_ext.rs +++ b/tokio-stream/src/stream_ext.rs @@ -61,6 +61,8 @@ cfg_time! { use tokio::time::Duration; mod throttle; use throttle::{throttle, Throttle}; + mod chunks_timeout; + use chunks_timeout::ChunksTimeout; } /// An extension trait for the [`Stream`] trait that provides a variety of @@ -993,6 +995,47 @@ pub trait StreamExt: Stream { { throttle(duration, self) } + + /// Collects items into batches inside a vector within a deadline. + /// + /// `chunks_timeout` attempts to yield a vector of len `capacity` within the deadline, + /// otherwise will yield a vector of len less than `capacity` if the deadline is reached + /// + /// # Example + /// + /// ``` + /// use std::time::Duration; + /// use tokio::time; + /// use tokio_stream::{self as stream, StreamExt}; + /// use futures::FutureExt; + /// + /// #[tokio::main] + /// async fn main() { + /// let iter = vec![1, 2, 3].into_iter(); + /// let stream0 = stream::iter(iter); + /// + /// let iter = vec![4].into_iter(); + /// let stream1 = stream::iter(iter) + /// .then(move |n| time::sleep(Duration::from_secs(2)).map(move |_| n)); + /// + /// let chunk_stream = stream0 + /// .chain(stream1) + /// .chunks_timeout(4, Duration::from_secs(1)); + /// + /// tokio::pin!(chunk_stream); + /// + /// assert_eq!(chunk_stream.next().await, Some(vec![1,2,3])); + /// assert_eq!(chunk_stream.next().await, Some(vec![4])); + /// } + /// ``` + #[cfg(all(feature = "time"))] + #[cfg_attr(docsrs, doc(cfg(feature = "time")))] + fn chunks_timeout(self, capacity: usize, duration: Duration) -> ChunksTimeout + where + Self: Sized, + { + ChunksTimeout::new(self, capacity, duration) + } } impl StreamExt for St where St: Stream {} diff --git a/tokio-stream/src/stream_ext/chunks_timeout.rs b/tokio-stream/src/stream_ext/chunks_timeout.rs new file mode 100644 index 00000000000..fbdd036ec1b --- /dev/null +++ b/tokio-stream/src/stream_ext/chunks_timeout.rs @@ -0,0 +1,93 @@ +use crate::stream_ext::Fuse; +use crate::Stream; +use tokio::time::Sleep; + +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; +use std::time::Duration; + +pin_project! { + /// Stream returned by the [`chunks_timeout`](super::StreamExt::chunks_timeout) method. + #[must_use = "streams do nothing unless polled"] + #[derive(Debug)] + pub struct ChunksTimeout { + #[pin] + stream: Fuse, + #[pin] + deadline: Sleep, + duration: Duration, + items: Vec, + cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 + } +} + +impl ChunksTimeout { + pub(super) fn new(stream: S, capacity: usize, duration: Duration) -> Self { + ChunksTimeout { + stream: Fuse::new(stream), + deadline: tokio::time::sleep(duration), + duration, + items: Vec::with_capacity(capacity), + cap: capacity, + } + } + + fn take(mut self: Pin<&mut Self>) -> Vec { + let duration = self.duration; + let cap = self.cap; + let this = self.as_mut().project(); + this.deadline.reset(tokio::time::Instant::now() + duration); + + std::mem::replace(this.items, Vec::with_capacity(cap)) + } +} + +impl Stream for ChunksTimeout { + type Item = Vec; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut me = self.as_mut().project(); + loop { + match me.stream.as_mut().poll_next(cx) { + Poll::Pending => {} + Poll::Ready(Some(item)) => { + me.items.push(item); + if me.items.len() >= *me.cap { + return Poll::Ready(Some(self.take())); + } + } + + Poll::Ready(None) => { + let last = if me.items.is_empty() { + None + } else { + let full_buf = std::mem::take(me.items); + Some(full_buf) + }; + + return Poll::Ready(last); + } + } + + match me.deadline.as_mut().poll(cx) { + Poll::Pending => {} + Poll::Ready(()) => { + return Poll::Ready(Some(self.take())); + } + } + } + } + + fn size_hint(&self) -> (usize, Option) { + let chunk_len = if self.items.is_empty() { 0 } else { 1 }; + let (lower, upper) = self.stream.size_hint(); + let lower = lower.saturating_add(chunk_len); + let upper = match upper { + Some(x) => x.checked_add(chunk_len), + None => None, + }; + (lower, upper) + } +} diff --git a/tokio-stream/tests/chunks_timeout.rs b/tokio-stream/tests/chunks_timeout.rs new file mode 100644 index 00000000000..d6a3f2c9b02 --- /dev/null +++ b/tokio-stream/tests/chunks_timeout.rs @@ -0,0 +1,27 @@ +#![warn(rust_2018_idioms)] +#![cfg(all(feature = "time", feature = "sync", feature = "io-util"))] + +use tokio::time; +use tokio_stream::{self as stream, StreamExt}; + +use futures::FutureExt; +use std::time::Duration; + +#[tokio::test(flavor = "multi_thread")] +async fn usage() { + let iter = vec![1, 2, 3].into_iter(); + let stream0 = stream::iter(iter); + + let iter = vec![4].into_iter(); + let stream1 = stream::iter(iter) + .then(move |n| time::sleep(Duration::from_secs(2)).map(move |_| n)); + + let chunk_stream = stream0 + .chain(stream1) + .chunks_timeout(4, Duration::from_secs(1)); + + tokio::pin!(chunk_stream); + + assert_eq!(chunk_stream.next().await, Some(vec![1,2,3])); + assert_eq!(chunk_stream.next().await, Some(vec![4])); +} From ab6249420b57c55d85ed4e36a80c429126b96e00 Mon Sep 17 00:00:00 2001 From: Jeff Lai Date: Tue, 24 May 2022 14:09:34 -0400 Subject: [PATCH 02/12] Yield to scheduler and use start_paused for testing --- tokio-stream/src/stream_ext.rs | 6 +-- tokio-stream/src/stream_ext/chunks_timeout.rs | 51 +++++++++---------- tokio-stream/tests/chunks_timeout.rs | 40 ++++++++++++--- 3 files changed, 61 insertions(+), 36 deletions(-) diff --git a/tokio-stream/src/stream_ext.rs b/tokio-stream/src/stream_ext.rs index 4d15a0bc843..857f394ebd0 100644 --- a/tokio-stream/src/stream_ext.rs +++ b/tokio-stream/src/stream_ext.rs @@ -1003,7 +1003,7 @@ pub trait StreamExt: Stream { /// /// # Example /// - /// ``` + /// ```rust,no_run /// use std::time::Duration; /// use tokio::time; /// use tokio_stream::{self as stream, StreamExt}; @@ -1022,13 +1022,11 @@ pub trait StreamExt: Stream { /// .chain(stream1) /// .chunks_timeout(4, Duration::from_secs(1)); /// - /// tokio::pin!(chunk_stream); - /// /// assert_eq!(chunk_stream.next().await, Some(vec![1,2,3])); /// assert_eq!(chunk_stream.next().await, Some(vec![4])); /// } /// ``` - #[cfg(all(feature = "time"))] + #[cfg(feature = "time")] #[cfg_attr(docsrs, doc(cfg(feature = "time")))] fn chunks_timeout(self, capacity: usize, duration: Duration) -> ChunksTimeout where diff --git a/tokio-stream/src/stream_ext/chunks_timeout.rs b/tokio-stream/src/stream_ext/chunks_timeout.rs index fbdd036ec1b..dd1036b03a0 100644 --- a/tokio-stream/src/stream_ext/chunks_timeout.rs +++ b/tokio-stream/src/stream_ext/chunks_timeout.rs @@ -49,45 +49,44 @@ impl Stream for ChunksTimeout { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut me = self.as_mut().project(); - loop { - match me.stream.as_mut().poll_next(cx) { - Poll::Pending => {} - Poll::Ready(Some(item)) => { - me.items.push(item); - if me.items.len() >= *me.cap { - return Poll::Ready(Some(self.take())); - } + + match me.stream.as_mut().poll_next(cx) { + Poll::Pending => {} + Poll::Ready(Some(item)) => { + me.items.push(item); + if me.items.len() >= *me.cap { + return Poll::Ready(Some(self.take())); } + } - Poll::Ready(None) => { - let last = if me.items.is_empty() { - None - } else { - let full_buf = std::mem::take(me.items); - Some(full_buf) - }; + Poll::Ready(None) => { + let last = if me.items.is_empty() { + None + } else { + let full_buf = std::mem::take(me.items); + Some(full_buf) + }; - return Poll::Ready(last); - } + return Poll::Ready(last); } + } - match me.deadline.as_mut().poll(cx) { - Poll::Pending => {} - Poll::Ready(()) => { - return Poll::Ready(Some(self.take())); - } + match me.deadline.as_mut().poll(cx) { + Poll::Pending => {} + Poll::Ready(()) => { + return Poll::Ready(Some(self.take())); } } + + cx.waker().wake_by_ref(); + Poll::Pending } fn size_hint(&self) -> (usize, Option) { let chunk_len = if self.items.is_empty() { 0 } else { 1 }; let (lower, upper) = self.stream.size_hint(); let lower = lower.saturating_add(chunk_len); - let upper = match upper { - Some(x) => x.checked_add(chunk_len), - None => None, - }; + let upper = upper.and_then(|x| x.checked_add(chunk_len)); (lower, upper) } } diff --git a/tokio-stream/tests/chunks_timeout.rs b/tokio-stream/tests/chunks_timeout.rs index d6a3f2c9b02..71d295352e1 100644 --- a/tokio-stream/tests/chunks_timeout.rs +++ b/tokio-stream/tests/chunks_timeout.rs @@ -3,25 +3,53 @@ use tokio::time; use tokio_stream::{self as stream, StreamExt}; +use tokio_test::assert_pending; +use tokio_test::task; use futures::FutureExt; use std::time::Duration; -#[tokio::test(flavor = "multi_thread")] +#[tokio::test(start_paused = true)] async fn usage() { let iter = vec![1, 2, 3].into_iter(); let stream0 = stream::iter(iter); let iter = vec![4].into_iter(); - let stream1 = stream::iter(iter) - .then(move |n| time::sleep(Duration::from_secs(2)).map(move |_| n)); + let stream1 = + stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(3)).map(move |_| n)); let chunk_stream = stream0 .chain(stream1) - .chunks_timeout(4, Duration::from_secs(1)); + .chunks_timeout(4, Duration::from_secs(2)); - tokio::pin!(chunk_stream); + let mut chunk_stream = task::spawn(chunk_stream); - assert_eq!(chunk_stream.next().await, Some(vec![1,2,3])); + assert_pending!(chunk_stream.poll_next()); + assert_pending!(chunk_stream.poll_next()); + assert_pending!(chunk_stream.poll_next()); + time::advance(Duration::from_secs(3)).await; + assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3])); + + time::advance(Duration::from_secs(3)).await; + assert_eq!(chunk_stream.next().await, Some(vec![4])); +} + +#[tokio::test] +#[ignore] +async fn real_time() { + let iter = vec![1, 2, 3].into_iter(); + let stream0 = stream::iter(iter); + + let iter = vec![4].into_iter(); + let stream1 = + stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(3)).map(move |_| n)); + + let chunk_stream = stream0 + .chain(stream1) + .chunks_timeout(4, Duration::from_secs(2)); + + let mut chunk_stream = task::spawn(chunk_stream); + + assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3])); assert_eq!(chunk_stream.next().await, Some(vec![4])); } From 9b60d6068754ed6ba40e920ee913b4a043e248aa Mon Sep 17 00:00:00 2001 From: Jeff Lai Date: Tue, 24 May 2022 19:15:16 -0400 Subject: [PATCH 03/12] Add loop and remove wake_by_ref --- tokio-stream/src/stream_ext/chunks_timeout.rs | 39 +++++++++---------- tokio-stream/tests/chunks_timeout.rs | 6 +-- 2 files changed, 20 insertions(+), 25 deletions(-) diff --git a/tokio-stream/src/stream_ext/chunks_timeout.rs b/tokio-stream/src/stream_ext/chunks_timeout.rs index dd1036b03a0..4e14944deda 100644 --- a/tokio-stream/src/stream_ext/chunks_timeout.rs +++ b/tokio-stream/src/stream_ext/chunks_timeout.rs @@ -49,37 +49,34 @@ impl Stream for ChunksTimeout { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut me = self.as_mut().project(); - - match me.stream.as_mut().poll_next(cx) { - Poll::Pending => {} - Poll::Ready(Some(item)) => { - me.items.push(item); - if me.items.len() >= *me.cap { - return Poll::Ready(Some(self.take())); + loop { + match me.stream.as_mut().poll_next(cx) { + Poll::Pending => break, + Poll::Ready(Some(item)) => { + me.items.push(item); + if me.items.len() >= *me.cap { + return Poll::Ready(Some(self.take())); + } } - } - - Poll::Ready(None) => { - let last = if me.items.is_empty() { - None - } else { - let full_buf = std::mem::take(me.items); - Some(full_buf) - }; + Poll::Ready(None) => { + let last = if me.items.is_empty() { + None + } else { + let full_buf = std::mem::take(me.items); + Some(full_buf) + }; - return Poll::Ready(last); + return Poll::Ready(last); + } } } match me.deadline.as_mut().poll(cx) { - Poll::Pending => {} + Poll::Pending => Poll::Pending, Poll::Ready(()) => { return Poll::Ready(Some(self.take())); } } - - cx.waker().wake_by_ref(); - Poll::Pending } fn size_hint(&self) -> (usize, Option) { diff --git a/tokio-stream/tests/chunks_timeout.rs b/tokio-stream/tests/chunks_timeout.rs index 71d295352e1..63a7f151243 100644 --- a/tokio-stream/tests/chunks_timeout.rs +++ b/tokio-stream/tests/chunks_timeout.rs @@ -25,12 +25,10 @@ async fn usage() { let mut chunk_stream = task::spawn(chunk_stream); assert_pending!(chunk_stream.poll_next()); - assert_pending!(chunk_stream.poll_next()); - assert_pending!(chunk_stream.poll_next()); - time::advance(Duration::from_secs(3)).await; + time::advance(Duration::from_secs(2)).await; assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3])); - time::advance(Duration::from_secs(3)).await; + time::advance(Duration::from_secs(2)).await; assert_eq!(chunk_stream.next().await, Some(vec![4])); } From 3221088ae6c01fe062ed3e512bef3baa57bcf965 Mon Sep 17 00:00:00 2001 From: Jeff Lai Date: Wed, 25 May 2022 12:59:48 -0400 Subject: [PATCH 04/12] Add full_chunk_with_timeout test --- tokio-stream/tests/chunks_timeout.rs | 34 ++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/tokio-stream/tests/chunks_timeout.rs b/tokio-stream/tests/chunks_timeout.rs index 63a7f151243..0c1dcd06406 100644 --- a/tokio-stream/tests/chunks_timeout.rs +++ b/tokio-stream/tests/chunks_timeout.rs @@ -28,6 +28,40 @@ async fn usage() { time::advance(Duration::from_secs(2)).await; assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3])); + assert_pending!(chunk_stream.poll_next()); + time::advance(Duration::from_secs(2)).await; + assert_eq!(chunk_stream.next().await, Some(vec![4])); +} + +#[tokio::test(start_paused = true)] +async fn full_chunk_with_timeout() { + let iter = vec![1, 2].into_iter(); + let stream0 = stream::iter(iter); + + let iter = vec![3].into_iter(); + let stream1 = + stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(1)).map(move |_| n)); + + let iter = vec![4].into_iter(); + let stream2 = + stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(3)).map(move |_| n)); + + let chunk_stream = stream0 + .chain(stream1) + .chain(stream2) + .chunks_timeout(3, Duration::from_secs(2)); + + let mut chunk_stream = task::spawn(chunk_stream); + + assert_pending!(chunk_stream.poll_next()); + time::advance(Duration::from_secs(2)).await; + assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3])); + + assert_pending!(chunk_stream.poll_next()); + time::advance(Duration::from_secs(2)).await; + assert_eq!(chunk_stream.next().await, Some(vec![])); + + assert_pending!(chunk_stream.poll_next()); time::advance(Duration::from_secs(2)).await; assert_eq!(chunk_stream.next().await, Some(vec![4])); } From 95d6b06f01be76065212032583854d9c1a77b175 Mon Sep 17 00:00:00 2001 From: Jeff Lai Date: Tue, 31 May 2022 10:37:54 -0400 Subject: [PATCH 05/12] Fix clippy --- tokio-stream/src/stream_ext/chunks_timeout.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tokio-stream/src/stream_ext/chunks_timeout.rs b/tokio-stream/src/stream_ext/chunks_timeout.rs index 4e14944deda..82050f3a392 100644 --- a/tokio-stream/src/stream_ext/chunks_timeout.rs +++ b/tokio-stream/src/stream_ext/chunks_timeout.rs @@ -73,9 +73,7 @@ impl Stream for ChunksTimeout { match me.deadline.as_mut().poll(cx) { Poll::Pending => Poll::Pending, - Poll::Ready(()) => { - return Poll::Ready(Some(self.take())); - } + Poll::Ready(()) => Poll::Ready(Some(self.take())), } } From 0293d8f8cf0997cde416bcda32898942a0026e6e Mon Sep 17 00:00:00 2001 From: Jeff Lai Date: Tue, 31 May 2022 10:59:45 -0400 Subject: [PATCH 06/12] Add more documentation --- tokio-stream/src/stream_ext.rs | 16 ++++++++++++---- tokio-stream/tests/chunks_timeout.rs | 10 ++++++---- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/tokio-stream/src/stream_ext.rs b/tokio-stream/src/stream_ext.rs index 857f394ebd0..6cd25d02c2f 100644 --- a/tokio-stream/src/stream_ext.rs +++ b/tokio-stream/src/stream_ext.rs @@ -998,8 +998,10 @@ pub trait StreamExt: Stream { /// Collects items into batches inside a vector within a deadline. /// - /// `chunks_timeout` attempts to yield a vector of len `capacity` within the deadline, - /// otherwise will yield a vector of len less than `capacity` if the deadline is reached + /// Attempts to yield a vector of len `capacity` within the deadline, + /// otherwise will yield a vector of len less than `capacity` if the deadline is reached. + /// If the wrapped stream does not yield a value before the deadline is reached, then an + /// empty vector will be yielded. The deadline will be reset after every batch is yielded. /// /// # Example /// @@ -1016,14 +1018,20 @@ pub trait StreamExt: Stream { /// /// let iter = vec![4].into_iter(); /// let stream1 = stream::iter(iter) - /// .then(move |n| time::sleep(Duration::from_secs(2)).map(move |_| n)); + /// .then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n)); /// /// let chunk_stream = stream0 /// .chain(stream1) - /// .chunks_timeout(4, Duration::from_secs(1)); + /// .chunks_timeout(3, Duration::from_secs(2)); /// + /// // a full batch was received /// assert_eq!(chunk_stream.next().await, Some(vec![1,2,3])); + /// // deadline was reached before capacity was reached /// assert_eq!(chunk_stream.next().await, Some(vec![4])); + /// // no items were emitted within the deadline + /// assert_eq!(chunk_stream.next().await, Some(vec![])); + /// // deadline was reached before capacity was reached + /// assert_eq!(chunk_stream.next().await, Some(vec![5])); /// } /// ``` #[cfg(feature = "time")] diff --git a/tokio-stream/tests/chunks_timeout.rs b/tokio-stream/tests/chunks_timeout.rs index 0c1dcd06406..3df237390d8 100644 --- a/tokio-stream/tests/chunks_timeout.rs +++ b/tokio-stream/tests/chunks_timeout.rs @@ -69,19 +69,21 @@ async fn full_chunk_with_timeout() { #[tokio::test] #[ignore] async fn real_time() { - let iter = vec![1, 2, 3].into_iter(); + let iter = vec![1, 2, 3, 4].into_iter(); let stream0 = stream::iter(iter); - let iter = vec![4].into_iter(); + let iter = vec![5].into_iter(); let stream1 = - stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(3)).map(move |_| n)); + stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n)); let chunk_stream = stream0 .chain(stream1) - .chunks_timeout(4, Duration::from_secs(2)); + .chunks_timeout(3, Duration::from_secs(2)); let mut chunk_stream = task::spawn(chunk_stream); assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3])); assert_eq!(chunk_stream.next().await, Some(vec![4])); + assert_eq!(chunk_stream.next().await, Some(vec![])); + assert_eq!(chunk_stream.next().await, Some(vec![5])); } From e4fb89026128885e0fe06a305dd6f778449f43b3 Mon Sep 17 00:00:00 2001 From: Jeff Lai Date: Tue, 31 May 2022 11:47:09 -0400 Subject: [PATCH 07/12] Fix doc test --- tokio-stream/src/stream_ext.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tokio-stream/src/stream_ext.rs b/tokio-stream/src/stream_ext.rs index 6cd25d02c2f..4aa36d9e7b9 100644 --- a/tokio-stream/src/stream_ext.rs +++ b/tokio-stream/src/stream_ext.rs @@ -1013,16 +1013,17 @@ pub trait StreamExt: Stream { /// /// #[tokio::main] /// async fn main() { - /// let iter = vec![1, 2, 3].into_iter(); + /// let iter = vec![1, 2, 3, 4].into_iter(); /// let stream0 = stream::iter(iter); /// - /// let iter = vec![4].into_iter(); + /// let iter = vec![5].into_iter(); /// let stream1 = stream::iter(iter) /// .then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n)); /// /// let chunk_stream = stream0 /// .chain(stream1) /// .chunks_timeout(3, Duration::from_secs(2)); + /// tokio::pin!(chunk_stream); /// /// // a full batch was received /// assert_eq!(chunk_stream.next().await, Some(vec![1,2,3])); From 7947992d4627f778b6e873a21bb2ba694f5b381b Mon Sep 17 00:00:00 2001 From: Jeff Lai Date: Fri, 3 Jun 2022 22:21:25 -0400 Subject: [PATCH 08/12] Better doc, start timer on first item --- tokio-stream/src/stream_ext.rs | 25 +++++++++-------- tokio-stream/src/stream_ext/chunks_timeout.rs | 27 ++++++++++++------- tokio-stream/tests/chunks_timeout.rs | 5 ---- 3 files changed, 31 insertions(+), 26 deletions(-) diff --git a/tokio-stream/src/stream_ext.rs b/tokio-stream/src/stream_ext.rs index e1d08a98116..db0fee611bd 100644 --- a/tokio-stream/src/stream_ext.rs +++ b/tokio-stream/src/stream_ext.rs @@ -1008,12 +1008,17 @@ pub trait StreamExt: Stream { throttle(duration, self) } - /// Collects items into batches inside a vector within a deadline. + + + /// Batches the items in the given stream using a maximum duration and size for each batch. + /// + /// This stream returns the next batch of items in the following situations: + /// 1. The inner stream has returned at least `max_size` many items since the last batch. + /// 2. The time since the first element of a batch is greater than the given duration. + /// 3. The end of the stream is reached /// - /// Attempts to yield a vector of len `capacity` within the deadline, - /// otherwise will yield a vector of len less than `capacity` if the deadline is reached. - /// If the wrapped stream does not yield a value before the deadline is reached, then an - /// empty vector will be yielded. The deadline will be reset after every batch is yielded. + /// The length of the returned vector is never empty or greater than capacity. Empty batches + /// will not be emitted if no items are received upstream. /// /// # Example /// @@ -1039,21 +1044,19 @@ pub trait StreamExt: Stream { /// /// // a full batch was received /// assert_eq!(chunk_stream.next().await, Some(vec![1,2,3])); - /// // deadline was reached before capacity was reached + /// // deadline was reached before max_size was reached /// assert_eq!(chunk_stream.next().await, Some(vec![4])); - /// // no items were emitted within the deadline - /// assert_eq!(chunk_stream.next().await, Some(vec![])); - /// // deadline was reached before capacity was reached + /// // last element in the stream /// assert_eq!(chunk_stream.next().await, Some(vec![5])); /// } /// ``` #[cfg(feature = "time")] #[cfg_attr(docsrs, doc(cfg(feature = "time")))] - fn chunks_timeout(self, capacity: usize, duration: Duration) -> ChunksTimeout + fn chunks_timeout(self, max_size: usize, duration: Duration) -> ChunksTimeout where Self: Sized, { - ChunksTimeout::new(self, capacity, duration) + ChunksTimeout::new(self, max_size, duration) } } diff --git a/tokio-stream/src/stream_ext/chunks_timeout.rs b/tokio-stream/src/stream_ext/chunks_timeout.rs index 82050f3a392..07a5a2870eb 100644 --- a/tokio-stream/src/stream_ext/chunks_timeout.rs +++ b/tokio-stream/src/stream_ext/chunks_timeout.rs @@ -1,6 +1,6 @@ use crate::stream_ext::Fuse; use crate::Stream; -use tokio::time::Sleep; +use tokio::time::{sleep, Instant, Sleep}; use core::future::Future; use core::pin::Pin; @@ -17,6 +17,7 @@ pin_project! { stream: Fuse, #[pin] deadline: Sleep, + poll_deadline: bool, duration: Duration, items: Vec, cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 @@ -24,21 +25,21 @@ pin_project! { } impl ChunksTimeout { - pub(super) fn new(stream: S, capacity: usize, duration: Duration) -> Self { + pub(super) fn new(stream: S, max_size: usize, duration: Duration) -> Self { ChunksTimeout { stream: Fuse::new(stream), - deadline: tokio::time::sleep(duration), + deadline: sleep(duration), + poll_deadline: false, duration, - items: Vec::with_capacity(capacity), - cap: capacity, + items: Vec::with_capacity(max_size), + cap: max_size, } } fn take(mut self: Pin<&mut Self>) -> Vec { - let duration = self.duration; let cap = self.cap; let this = self.as_mut().project(); - this.deadline.reset(tokio::time::Instant::now() + duration); + *this.poll_deadline = false; std::mem::replace(this.items, Vec::with_capacity(cap)) } @@ -53,6 +54,10 @@ impl Stream for ChunksTimeout { match me.stream.as_mut().poll_next(cx) { Poll::Pending => break, Poll::Ready(Some(item)) => { + if me.items.is_empty() { + me.deadline.as_mut().reset(Instant::now() + *me.duration); + *me.poll_deadline = true; + } me.items.push(item); if me.items.len() >= *me.cap { return Poll::Ready(Some(self.take())); @@ -71,10 +76,12 @@ impl Stream for ChunksTimeout { } } - match me.deadline.as_mut().poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(()) => Poll::Ready(Some(self.take())), + if *me.poll_deadline { + ready!(me.deadline.poll(cx)); + return Poll::Ready(Some(self.take())); } + + Poll::Pending } fn size_hint(&self) -> (usize, Option) { diff --git a/tokio-stream/tests/chunks_timeout.rs b/tokio-stream/tests/chunks_timeout.rs index 3df237390d8..ffc7deadd70 100644 --- a/tokio-stream/tests/chunks_timeout.rs +++ b/tokio-stream/tests/chunks_timeout.rs @@ -57,10 +57,6 @@ async fn full_chunk_with_timeout() { time::advance(Duration::from_secs(2)).await; assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3])); - assert_pending!(chunk_stream.poll_next()); - time::advance(Duration::from_secs(2)).await; - assert_eq!(chunk_stream.next().await, Some(vec![])); - assert_pending!(chunk_stream.poll_next()); time::advance(Duration::from_secs(2)).await; assert_eq!(chunk_stream.next().await, Some(vec![4])); @@ -84,6 +80,5 @@ async fn real_time() { assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3])); assert_eq!(chunk_stream.next().await, Some(vec![4])); - assert_eq!(chunk_stream.next().await, Some(vec![])); assert_eq!(chunk_stream.next().await, Some(vec![5])); } From 812e1c012080ffeffc7f79b11e4942955affe6fa Mon Sep 17 00:00:00 2001 From: Jeff Lai Date: Sat, 4 Jun 2022 13:12:48 -0400 Subject: [PATCH 09/12] Misc changes for review --- tokio-stream/src/stream_ext.rs | 15 ++++++++----- tokio-stream/src/stream_ext/chunks_timeout.rs | 22 +++++-------------- 2 files changed, 16 insertions(+), 21 deletions(-) diff --git a/tokio-stream/src/stream_ext.rs b/tokio-stream/src/stream_ext.rs index db0fee611bd..fa8e07f1de3 100644 --- a/tokio-stream/src/stream_ext.rs +++ b/tokio-stream/src/stream_ext.rs @@ -1008,27 +1008,31 @@ pub trait StreamExt: Stream { throttle(duration, self) } - - /// Batches the items in the given stream using a maximum duration and size for each batch. /// /// This stream returns the next batch of items in the following situations: /// 1. The inner stream has returned at least `max_size` many items since the last batch. /// 2. The time since the first element of a batch is greater than the given duration. - /// 3. The end of the stream is reached + /// 3. The end of the stream is reached. /// - /// The length of the returned vector is never empty or greater than capacity. Empty batches + /// The length of the returned vector is never empty or greater than the maximum size. Empty batches /// will not be emitted if no items are received upstream. /// + /// # Panics + /// + /// This function panics if `max_size` is zero + /// /// # Example /// - /// ```rust,no_run + /// ```rust /// use std::time::Duration; /// use tokio::time; /// use tokio_stream::{self as stream, StreamExt}; /// use futures::FutureExt; /// /// #[tokio::main] + /// # async fn _unused() {} + /// #[tokio::main(flavor = "current_thread", start_paused = true)] /// async fn main() { /// let iter = vec![1, 2, 3, 4].into_iter(); /// let stream0 = stream::iter(iter); @@ -1056,6 +1060,7 @@ pub trait StreamExt: Stream { where Self: Sized, { + assert!(max_size > 0, "`max_size` must be non-zero."); ChunksTimeout::new(self, max_size, duration) } } diff --git a/tokio-stream/src/stream_ext/chunks_timeout.rs b/tokio-stream/src/stream_ext/chunks_timeout.rs index 07a5a2870eb..037b4339f03 100644 --- a/tokio-stream/src/stream_ext/chunks_timeout.rs +++ b/tokio-stream/src/stream_ext/chunks_timeout.rs @@ -17,7 +17,6 @@ pin_project! { stream: Fuse, #[pin] deadline: Sleep, - poll_deadline: bool, duration: Duration, items: Vec, cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 @@ -29,20 +28,11 @@ impl ChunksTimeout { ChunksTimeout { stream: Fuse::new(stream), deadline: sleep(duration), - poll_deadline: false, duration, items: Vec::with_capacity(max_size), cap: max_size, } } - - fn take(mut self: Pin<&mut Self>) -> Vec { - let cap = self.cap; - let this = self.as_mut().project(); - *this.poll_deadline = false; - - std::mem::replace(this.items, Vec::with_capacity(cap)) - } } impl Stream for ChunksTimeout { @@ -56,19 +46,19 @@ impl Stream for ChunksTimeout { Poll::Ready(Some(item)) => { if me.items.is_empty() { me.deadline.as_mut().reset(Instant::now() + *me.duration); - *me.poll_deadline = true; + me.items.reserve_exact(*me.cap); } me.items.push(item); if me.items.len() >= *me.cap { - return Poll::Ready(Some(self.take())); + return Poll::Ready(Some(std::mem::take(me.items))); } } Poll::Ready(None) => { + // Returning Some here is only correct because we fuse the inner stream. let last = if me.items.is_empty() { None } else { - let full_buf = std::mem::take(me.items); - Some(full_buf) + Some(std::mem::take(me.items)) }; return Poll::Ready(last); @@ -76,9 +66,9 @@ impl Stream for ChunksTimeout { } } - if *me.poll_deadline { + if !me.items.is_empty() { ready!(me.deadline.poll(cx)); - return Poll::Ready(Some(self.take())); + return Poll::Ready(Some(std::mem::take(me.items))); } Poll::Pending From e67d2d7e2b89002f5d4820bb4224a82ff2e84937 Mon Sep 17 00:00:00 2001 From: Jeff Lai Date: Sat, 4 Jun 2022 19:48:07 -0400 Subject: [PATCH 10/12] Fix doc --- tokio-stream/src/stream_ext.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio-stream/src/stream_ext.rs b/tokio-stream/src/stream_ext.rs index fa8e07f1de3..628271105f2 100644 --- a/tokio-stream/src/stream_ext.rs +++ b/tokio-stream/src/stream_ext.rs @@ -1032,7 +1032,7 @@ pub trait StreamExt: Stream { /// /// #[tokio::main] /// # async fn _unused() {} - /// #[tokio::main(flavor = "current_thread", start_paused = true)] + /// # #[tokio::main(flavor = "current_thread", start_paused = true)] /// async fn main() { /// let iter = vec![1, 2, 3, 4].into_iter(); /// let stream0 = stream::iter(iter); From ae89cb754e4fc18ffad35160e90932fbabe85370 Mon Sep 17 00:00:00 2001 From: Jeff Lai Date: Sat, 4 Jun 2022 19:55:58 -0400 Subject: [PATCH 11/12] consistent vocab --- tokio-stream/src/stream_ext.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio-stream/src/stream_ext.rs b/tokio-stream/src/stream_ext.rs index 628271105f2..5927dbd22ab 100644 --- a/tokio-stream/src/stream_ext.rs +++ b/tokio-stream/src/stream_ext.rs @@ -1012,7 +1012,7 @@ pub trait StreamExt: Stream { /// /// This stream returns the next batch of items in the following situations: /// 1. The inner stream has returned at least `max_size` many items since the last batch. - /// 2. The time since the first element of a batch is greater than the given duration. + /// 2. The time since the first item of a batch is greater than the given duration. /// 3. The end of the stream is reached. /// /// The length of the returned vector is never empty or greater than the maximum size. Empty batches From 8ad623132b5cf028267046999167d74a271b3e8b Mon Sep 17 00:00:00 2001 From: jefftt Date: Sun, 5 Jun 2022 09:07:31 -0400 Subject: [PATCH 12/12] Update tokio-stream/src/stream_ext/chunks_timeout.rs Co-authored-by: Alice Ryhl --- tokio-stream/src/stream_ext/chunks_timeout.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio-stream/src/stream_ext/chunks_timeout.rs b/tokio-stream/src/stream_ext/chunks_timeout.rs index 037b4339f03..107101317a3 100644 --- a/tokio-stream/src/stream_ext/chunks_timeout.rs +++ b/tokio-stream/src/stream_ext/chunks_timeout.rs @@ -77,7 +77,7 @@ impl Stream for ChunksTimeout { fn size_hint(&self) -> (usize, Option) { let chunk_len = if self.items.is_empty() { 0 } else { 1 }; let (lower, upper) = self.stream.size_hint(); - let lower = lower.saturating_add(chunk_len); + let lower = (lower / self.cap).saturating_add(chunk_len); let upper = upper.and_then(|x| x.checked_add(chunk_len)); (lower, upper) }