diff --git a/futures-util/src/stream/select_with_strategy.rs b/futures-util/src/stream/select_with_strategy.rs index 97417d3858..c553a36752 100644 --- a/futures-util/src/stream/select_with_strategy.rs +++ b/futures-util/src/stream/select_with_strategy.rs @@ -13,13 +13,6 @@ pub enum PollNext { Right, } -enum PollSide { - /// Poll the first stream. - Left, - /// Poll the second stream. - Right, -} - impl PollNext { /// Toggle the value and return the old one. #[must_use] @@ -33,6 +26,13 @@ impl PollNext { old } + + fn other(&self) -> PollNext { + match self { + PollNext::Left => PollNext::Right, + PollNext::Right => PollNext::Left, + } + } } impl Default for PollNext { @@ -49,16 +49,16 @@ enum InternalState { } impl InternalState { - fn finish(&mut self, ps: PollSide) { + fn finish(&mut self, ps: &PollNext) { match (&self, ps) { - (InternalState::Start, PollSide::Left) => { + (InternalState::Start, PollNext::Left) => { *self = InternalState::LeftFinished; } - (InternalState::Start, PollSide::Right) => { + (InternalState::Start, PollNext::Right) => { *self = InternalState::RightFinished; } - (InternalState::LeftFinished, PollSide::Right) - | (InternalState::RightFinished, PollSide::Left) => { + (InternalState::LeftFinished, PollNext::Right) + | (InternalState::RightFinished, PollNext::Left) => { *self = InternalState::BothFinished; } _ => {} @@ -69,6 +69,7 @@ impl InternalState { pin_project! { /// Stream for the [`select_with_strategy()`] function. See function docs for details. #[must_use = "streams do nothing unless polled"] + #[project = SelectWithStrategyProj] pub struct SelectWithStrategy { #[pin] stream1: St1, @@ -210,6 +211,49 @@ where } } +#[inline(always)] +fn poll_side( + select: &mut SelectWithStrategyProj<'_, St1, St2, Clos, State>, + side: &PollNext, + cx: &mut Context<'_>, +) -> Poll> +where + St1: Stream, + St2: Stream, +{ + match side { + PollNext::Left => select.stream1.as_mut().poll_next(cx), + PollNext::Right => select.stream2.as_mut().poll_next(cx), + } +} + +#[inline(always)] +fn poll_inner( + select: &mut SelectWithStrategyProj<'_, St1, St2, Clos, State>, + side: &PollNext, + cx: &mut Context<'_>, +) -> Poll> +where + St1: Stream, + St2: Stream, +{ + match poll_side(select, side, cx) { + Poll::Ready(Some(item)) => return Poll::Ready(Some(item)), + Poll::Ready(None) => { + select.internal_state.finish(side); + } + Poll::Pending => (), + }; + let other = side.other(); + match poll_side(select, &other, cx) { + Poll::Ready(None) => { + select.internal_state.finish(&other); + Poll::Ready(None) + } + a => a, + } +} + impl Stream for SelectWithStrategy where St1: Stream, @@ -219,42 +263,12 @@ where type Item = St1::Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); + let mut this = self.project(); match this.internal_state { InternalState::Start => match (this.clos)(this.state) { - PollNext::Left => { - match this.stream1.poll_next(cx) { - Poll::Ready(Some(item)) => return Poll::Ready(Some(item)), - Poll::Ready(None) => { - this.internal_state.finish(PollSide::Left); - } - Poll::Pending => (), - }; - match this.stream2.poll_next(cx) { - Poll::Ready(None) => { - this.internal_state.finish(PollSide::Right); - Poll::Ready(None) - } - a => a, - } - } - PollNext::Right => { - match this.stream2.poll_next(cx) { - Poll::Ready(Some(item)) => return Poll::Ready(Some(item)), - Poll::Ready(None) => { - this.internal_state.finish(PollSide::Right); - } - Poll::Pending => (), - }; - match this.stream1.poll_next(cx) { - Poll::Ready(None) => { - this.internal_state.finish(PollSide::Left); - Poll::Ready(None) - } - a => a, - } - } + PollNext::Left => poll_inner(&mut this, &PollNext::Left, cx), + PollNext::Right => poll_inner(&mut this, &PollNext::Right, cx), }, InternalState::LeftFinished => match this.stream2.poll_next(cx) { Poll::Ready(None) => {