Skip to content

Commit

Permalink
stream: fix panic in ChunksTimeout::new (#5036)
Browse files Browse the repository at this point in the history
  • Loading branch information
nylonicious committed Sep 27, 2022
1 parent aedcec6 commit 2df4523
Showing 1 changed file with 7 additions and 5 deletions.
12 changes: 7 additions & 5 deletions tokio-stream/src/stream_ext/chunks_timeout.rs
@@ -1,6 +1,6 @@
use crate::stream_ext::Fuse;
use crate::Stream;
use tokio::time::{sleep, Instant, Sleep};
use tokio::time::{sleep, Sleep};

use core::future::Future;
use core::pin::Pin;
Expand All @@ -16,7 +16,7 @@ pin_project! {
#[pin]
stream: Fuse<S>,
#[pin]
deadline: Sleep,
deadline: Option<Sleep>,
duration: Duration,
items: Vec<S::Item>,
cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
Expand All @@ -27,7 +27,7 @@ impl<S: Stream> ChunksTimeout<S> {
pub(super) fn new(stream: S, max_size: usize, duration: Duration) -> Self {
ChunksTimeout {
stream: Fuse::new(stream),
deadline: sleep(duration),
deadline: None,
duration,
items: Vec::with_capacity(max_size),
cap: max_size,
Expand All @@ -45,7 +45,7 @@ impl<S: Stream> Stream for ChunksTimeout<S> {
Poll::Pending => break,
Poll::Ready(Some(item)) => {
if me.items.is_empty() {
me.deadline.as_mut().reset(Instant::now() + *me.duration);
me.deadline.set(Some(sleep(*me.duration)));
me.items.reserve_exact(*me.cap);
}
me.items.push(item);
Expand All @@ -67,7 +67,9 @@ impl<S: Stream> Stream for ChunksTimeout<S> {
}

if !me.items.is_empty() {
ready!(me.deadline.poll(cx));
if let Some(deadline) = me.deadline.as_pin_mut() {
ready!(deadline.poll(cx));
}
return Poll::Ready(Some(std::mem::take(me.items)));
}

Expand Down

0 comments on commit 2df4523

Please sign in to comment.