From 2df45234ef73cd7d98c377c8d52cacbcecf56b81 Mon Sep 17 00:00:00 2001 From: Marek Kuskowski <50183564+nylonicious@users.noreply.github.com> Date: Wed, 28 Sep 2022 00:34:08 +0200 Subject: [PATCH] stream: fix panic in `ChunksTimeout::new` (#5036) --- tokio-stream/src/stream_ext/chunks_timeout.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/tokio-stream/src/stream_ext/chunks_timeout.rs b/tokio-stream/src/stream_ext/chunks_timeout.rs index 107101317a3..48acd9328bc 100644 --- a/tokio-stream/src/stream_ext/chunks_timeout.rs +++ b/tokio-stream/src/stream_ext/chunks_timeout.rs @@ -1,6 +1,6 @@ use crate::stream_ext::Fuse; use crate::Stream; -use tokio::time::{sleep, Instant, Sleep}; +use tokio::time::{sleep, Sleep}; use core::future::Future; use core::pin::Pin; @@ -16,7 +16,7 @@ pin_project! { #[pin] stream: Fuse, #[pin] - deadline: Sleep, + deadline: Option, duration: Duration, items: Vec, cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 @@ -27,7 +27,7 @@ impl ChunksTimeout { pub(super) fn new(stream: S, max_size: usize, duration: Duration) -> Self { ChunksTimeout { stream: Fuse::new(stream), - deadline: sleep(duration), + deadline: None, duration, items: Vec::with_capacity(max_size), cap: max_size, @@ -45,7 +45,7 @@ impl Stream for ChunksTimeout { Poll::Pending => break, Poll::Ready(Some(item)) => { if me.items.is_empty() { - me.deadline.as_mut().reset(Instant::now() + *me.duration); + me.deadline.set(Some(sleep(*me.duration))); me.items.reserve_exact(*me.cap); } me.items.push(item); @@ -67,7 +67,9 @@ impl Stream for ChunksTimeout { } if !me.items.is_empty() { - ready!(me.deadline.poll(cx)); + if let Some(deadline) = me.deadline.as_pin_mut() { + ready!(deadline.poll(cx)); + } return Poll::Ready(Some(std::mem::take(me.items))); }