From acdd43f3387ef0c38f9e841d54b80451d96de163 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Mon, 15 Aug 2022 17:47:00 -0700 Subject: [PATCH 1/3] Fix incorrect termination of `select_with_strategy` streams --- futures-util/src/stream/select_with_strategy.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/futures-util/src/stream/select_with_strategy.rs b/futures-util/src/stream/select_with_strategy.rs index d9e58adbde..fc55690938 100644 --- a/futures-util/src/stream/select_with_strategy.rs +++ b/futures-util/src/stream/select_with_strategy.rs @@ -52,8 +52,8 @@ impl InternalState { (InternalState::Start, PollNext::Right) => { *self = InternalState::RightFinished; } - (InternalState::LeftFinished, PollNext::Right) - | (InternalState::RightFinished, PollNext::Left) => { + (InternalState::LeftFinished, PollNext::Left) + | (InternalState::RightFinished, PollNext::Right) => { *self = InternalState::BothFinished; } _ => {} From 54bcee74bd8e120f7808a0a77775a9a124a41be1 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 16 Aug 2022 09:36:48 -0700 Subject: [PATCH 2/3] Add example failing test --- futures/tests/stream.rs | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/futures/tests/stream.rs b/futures/tests/stream.rs index 6781a102d2..d0b3ef9e45 100644 --- a/futures/tests/stream.rs +++ b/futures/tests/stream.rs @@ -1,5 +1,9 @@ +use std::cell::Cell; use std::iter; +use std::pin::Pin; +use std::rc::Rc; use std::sync::Arc; +use std::task::Context; use futures::channel::mpsc; use futures::executor::block_on; @@ -9,6 +13,7 @@ use futures::sink::SinkExt; use futures::stream::{self, StreamExt}; use futures::task::Poll; use futures::{ready, FutureExt}; +use futures_core::Stream; use futures_test::task::noop_context; #[test] @@ -436,3 +441,38 @@ fn ready_chunks() { assert_eq!(s.next().await.unwrap(), vec![4]); }); } + +struct SlowStream { + times_should_poll: usize, + times_polled: Rc>, +} +impl Stream for SlowStream { + type Item = usize; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.times_polled.set(self.times_polled.get() + 1); + if self.times_polled.get() % 2 == 0 { + cx.waker().wake_by_ref(); + return Poll::Pending; + } + if self.times_polled.get() == self.times_should_poll { + return Poll::Ready(None); + } + Poll::Ready(Some(self.times_polled.get())) + } +} + +#[test] +fn select_with_strategy_doesnt_terminate_early() { + let times_should_poll = 10; + let count = Rc::new(Cell::new(0)); + let b = stream::iter([10, 20]); + + let selected = stream::select_with_strategy( + SlowStream { times_should_poll, times_polled: count.clone() }, + b, + |_: &mut ()| stream::PollNext::Left, + ); + block_on(selected.for_each(|v| async move { println!("{}", v) })); + assert_eq!(count.get(), times_should_poll); +} From 74e2c4b22189ab080990ec79e04f7539ac732ad2 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 16 Aug 2022 10:07:46 -0700 Subject: [PATCH 3/3] Actually fix the issue --- .../src/stream/select_with_strategy.rs | 15 +++++++---- futures/tests/stream.rs | 26 ++++++++++--------- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/futures-util/src/stream/select_with_strategy.rs b/futures-util/src/stream/select_with_strategy.rs index fc55690938..e703042f07 100644 --- a/futures-util/src/stream/select_with_strategy.rs +++ b/futures-util/src/stream/select_with_strategy.rs @@ -52,8 +52,8 @@ impl InternalState { (InternalState::Start, PollNext::Right) => { *self = InternalState::RightFinished; } - (InternalState::LeftFinished, PollNext::Left) - | (InternalState::RightFinished, PollNext::Right) => { + (InternalState::LeftFinished, PollNext::Right) + | (InternalState::RightFinished, PollNext::Left) => { *self = InternalState::BothFinished; } _ => {} @@ -229,18 +229,23 @@ where St1: Stream, St2: Stream, { - match poll_side(select, side, cx) { + let first_done = match poll_side(select, side, cx) { Poll::Ready(Some(item)) => return Poll::Ready(Some(item)), Poll::Ready(None) => { select.internal_state.finish(side); + true } - Poll::Pending => (), + Poll::Pending => false, }; let other = side.other(); match poll_side(select, other, cx) { Poll::Ready(None) => { select.internal_state.finish(other); - Poll::Ready(None) + if first_done { + Poll::Ready(None) + } else { + Poll::Pending + } } a => a, } diff --git a/futures/tests/stream.rs b/futures/tests/stream.rs index d0b3ef9e45..6a302b798e 100644 --- a/futures/tests/stream.rs +++ b/futures/tests/stream.rs @@ -455,7 +455,7 @@ impl Stream for SlowStream { cx.waker().wake_by_ref(); return Poll::Pending; } - if self.times_polled.get() == self.times_should_poll { + if self.times_polled.get() >= self.times_should_poll { return Poll::Ready(None); } Poll::Ready(Some(self.times_polled.get())) @@ -464,15 +464,17 @@ impl Stream for SlowStream { #[test] fn select_with_strategy_doesnt_terminate_early() { - let times_should_poll = 10; - let count = Rc::new(Cell::new(0)); - let b = stream::iter([10, 20]); - - let selected = stream::select_with_strategy( - SlowStream { times_should_poll, times_polled: count.clone() }, - b, - |_: &mut ()| stream::PollNext::Left, - ); - block_on(selected.for_each(|v| async move { println!("{}", v) })); - assert_eq!(count.get(), times_should_poll); + for side in [stream::PollNext::Left, stream::PollNext::Right] { + let times_should_poll = 10; + let count = Rc::new(Cell::new(0)); + let b = stream::iter([10, 20]); + + let mut selected = stream::select_with_strategy( + SlowStream { times_should_poll, times_polled: count.clone() }, + b, + |_: &mut ()| side, + ); + block_on(async move { while selected.next().await.is_some() {} }); + assert_eq!(count.get(), times_should_poll + 1); + } }