diff --git a/tokio-stream/src/stream_ext/chunks_timeout.rs b/tokio-stream/src/stream_ext/chunks_timeout.rs index 107101317a3..48acd9328bc 100644 --- a/tokio-stream/src/stream_ext/chunks_timeout.rs +++ b/tokio-stream/src/stream_ext/chunks_timeout.rs @@ -1,6 +1,6 @@ use crate::stream_ext::Fuse; use crate::Stream; -use tokio::time::{sleep, Instant, Sleep}; +use tokio::time::{sleep, Sleep}; use core::future::Future; use core::pin::Pin; @@ -16,7 +16,7 @@ pin_project! { #[pin] stream: Fuse, #[pin] - deadline: Sleep, + deadline: Option, duration: Duration, items: Vec, cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 @@ -27,7 +27,7 @@ impl ChunksTimeout { pub(super) fn new(stream: S, max_size: usize, duration: Duration) -> Self { ChunksTimeout { stream: Fuse::new(stream), - deadline: sleep(duration), + deadline: None, duration, items: Vec::with_capacity(max_size), cap: max_size, @@ -45,7 +45,7 @@ impl Stream for ChunksTimeout { Poll::Pending => break, Poll::Ready(Some(item)) => { if me.items.is_empty() { - me.deadline.as_mut().reset(Instant::now() + *me.duration); + me.deadline.set(Some(sleep(*me.duration))); me.items.reserve_exact(*me.cap); } me.items.push(item); @@ -67,7 +67,9 @@ impl Stream for ChunksTimeout { } if !me.items.is_empty() { - ready!(me.deadline.poll(cx)); + if let Some(deadline) = me.deadline.as_pin_mut() { + ready!(deadline.poll(cx)); + } return Poll::Ready(Some(std::mem::take(me.items))); }