From 3b8ad5cedaaaec2fe9af5981e972918e2dc7b032 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 16 Aug 2022 10:07:46 -0700 Subject: [PATCH] Actually fix the issue --- .../src/stream/select_with_strategy.rs | 15 +++++++---- futures/tests/stream.rs | 26 ++++++++++--------- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/futures-util/src/stream/select_with_strategy.rs b/futures-util/src/stream/select_with_strategy.rs index fc5569093..e703042f0 100644 --- a/futures-util/src/stream/select_with_strategy.rs +++ b/futures-util/src/stream/select_with_strategy.rs @@ -52,8 +52,8 @@ impl InternalState { (InternalState::Start, PollNext::Right) => { *self = InternalState::RightFinished; } - (InternalState::LeftFinished, PollNext::Left) - | (InternalState::RightFinished, PollNext::Right) => { + (InternalState::LeftFinished, PollNext::Right) + | (InternalState::RightFinished, PollNext::Left) => { *self = InternalState::BothFinished; } _ => {} @@ -229,18 +229,23 @@ where St1: Stream, St2: Stream, { - match poll_side(select, side, cx) { + let first_done = match poll_side(select, side, cx) { Poll::Ready(Some(item)) => return Poll::Ready(Some(item)), Poll::Ready(None) => { select.internal_state.finish(side); + true } - Poll::Pending => (), + Poll::Pending => false, }; let other = side.other(); match poll_side(select, other, cx) { Poll::Ready(None) => { select.internal_state.finish(other); - Poll::Ready(None) + if first_done { + Poll::Ready(None) + } else { + Poll::Pending + } } a => a, } diff --git a/futures/tests/stream.rs b/futures/tests/stream.rs index d0b3ef9e4..bba1c661d 100644 --- a/futures/tests/stream.rs +++ b/futures/tests/stream.rs @@ -455,7 +455,7 @@ impl Stream for SlowStream { cx.waker().wake_by_ref(); return Poll::Pending; } - if self.times_polled.get() == self.times_should_poll { + if self.times_polled.get() >= self.times_should_poll { return Poll::Ready(None); } Poll::Ready(Some(self.times_polled.get())) @@ -464,15 +464,17 @@ impl Stream for SlowStream { #[test] fn select_with_strategy_doesnt_terminate_early() { - let times_should_poll = 10; - let count = Rc::new(Cell::new(0)); - let b = stream::iter([10, 20]); - - let selected = stream::select_with_strategy( - SlowStream { times_should_poll, times_polled: count.clone() }, - b, - |_: &mut ()| stream::PollNext::Left, - ); - block_on(selected.for_each(|v| async move { println!("{}", v) })); - assert_eq!(count.get(), times_should_poll); + for side in [stream::PollNext::Left, stream::PollNext::Right] { + let times_should_poll = 10; + let count = Rc::new(Cell::new(0)); + let b = stream::iter([10, 20]); + + let mut selected = stream::select_with_strategy( + SlowStream { times_should_poll, times_polled: count.clone() }, + b, + |_: &mut ()| side, + ); + block_on(async move { while let Some(_) = selected.next().await {} }); + assert_eq!(count.get(), times_should_poll + 1); + } }