From f5866e5b9f05d4b8e0be7ca2da69529b2723b950 Mon Sep 17 00:00:00 2001 From: Marek Kuskowski <50183564+nylonicious@users.noreply.github.com> Date: Tue, 20 Sep 2022 14:23:46 +0200 Subject: [PATCH 1/2] stream: fix panic in `ChunksTimeout::new` Fixes #5031 --- tokio-stream/src/stream_ext/chunks_timeout.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tokio-stream/src/stream_ext/chunks_timeout.rs b/tokio-stream/src/stream_ext/chunks_timeout.rs index 107101317a3..541bf08e7c4 100644 --- a/tokio-stream/src/stream_ext/chunks_timeout.rs +++ b/tokio-stream/src/stream_ext/chunks_timeout.rs @@ -1,6 +1,6 @@ use crate::stream_ext::Fuse; use crate::Stream; -use tokio::time::{sleep, Instant, Sleep}; +use tokio::time::{sleep, Sleep}; use core::future::Future; use core::pin::Pin; @@ -16,7 +16,7 @@ pin_project! { #[pin] stream: Fuse, #[pin] - deadline: Sleep, + deadline: Option, duration: Duration, items: Vec, cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 @@ -27,7 +27,7 @@ impl ChunksTimeout { pub(super) fn new(stream: S, max_size: usize, duration: Duration) -> Self { ChunksTimeout { stream: Fuse::new(stream), - deadline: sleep(duration), + deadline: None, duration, items: Vec::with_capacity(max_size), cap: max_size, @@ -45,7 +45,7 @@ impl Stream for ChunksTimeout { Poll::Pending => break, Poll::Ready(Some(item)) => { if me.items.is_empty() { - me.deadline.as_mut().reset(Instant::now() + *me.duration); + me.deadline.set(Some(sleep(*me.duration))); me.items.reserve_exact(*me.cap); } me.items.push(item); @@ -67,7 +67,7 @@ impl Stream for ChunksTimeout { } if !me.items.is_empty() { - ready!(me.deadline.poll(cx)); + ready!(me.deadline.as_pin_mut().unwrap().poll(cx)); return Poll::Ready(Some(std::mem::take(me.items))); } From 48c1df15a25f62fb77a52dffd840248c664c8d1f Mon Sep 17 00:00:00 2001 From: Marek Kuskowski <50183564+nylonicious@users.noreply.github.com> Date: Tue, 27 Sep 2022 23:32:49 +0200 Subject: [PATCH 2/2] stream: check for None instead unwrapping --- tokio-stream/src/stream_ext/chunks_timeout.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tokio-stream/src/stream_ext/chunks_timeout.rs b/tokio-stream/src/stream_ext/chunks_timeout.rs index 541bf08e7c4..48acd9328bc 100644 --- a/tokio-stream/src/stream_ext/chunks_timeout.rs +++ b/tokio-stream/src/stream_ext/chunks_timeout.rs @@ -67,7 +67,9 @@ impl Stream for ChunksTimeout { } if !me.items.is_empty() { - ready!(me.deadline.as_pin_mut().unwrap().poll(cx)); + if let Some(deadline) = me.deadline.as_pin_mut() { + ready!(deadline.poll(cx)); + } return Poll::Ready(Some(std::mem::take(me.items))); }