From 568c8565ffbb215fd4af73f99fc821e40a8048a9 Mon Sep 17 00:00:00 2001 From: Owen Shepherd Date: Mon, 21 Mar 2022 09:38:14 +0000 Subject: [PATCH 1/5] Remove `Fuse`s from select, and only poll non-terminated streams --- .../src/stream/select_with_strategy.rs | 131 +++++++++++++----- 1 file changed, 96 insertions(+), 35 deletions(-) diff --git a/futures-util/src/stream/select_with_strategy.rs b/futures-util/src/stream/select_with_strategy.rs index 6ccb321aaf..97417d3858 100644 --- a/futures-util/src/stream/select_with_strategy.rs +++ b/futures-util/src/stream/select_with_strategy.rs @@ -1,5 +1,4 @@ use super::assert_stream; -use crate::stream::{Fuse, StreamExt}; use core::{fmt, pin::Pin}; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; @@ -14,6 +13,13 @@ pub enum PollNext { Right, } +enum PollSide { + /// Poll the first stream. + Left, + /// Poll the second stream. + Right, +} + impl PollNext { /// Toggle the value and return the old one. #[must_use] @@ -35,14 +41,40 @@ impl Default for PollNext { } } +enum InternalState { + Start, + LeftFinished, + RightFinished, + BothFinished, +} + +impl InternalState { + fn finish(&mut self, ps: PollSide) { + match (&self, ps) { + (InternalState::Start, PollSide::Left) => { + *self = InternalState::LeftFinished; + } + (InternalState::Start, PollSide::Right) => { + *self = InternalState::RightFinished; + } + (InternalState::LeftFinished, PollSide::Right) + | (InternalState::RightFinished, PollSide::Left) => { + *self = InternalState::BothFinished; + } + _ => {} + } + } +} + pin_project! { /// Stream for the [`select_with_strategy()`] function. See function docs for details. #[must_use = "streams do nothing unless polled"] pub struct SelectWithStrategy { #[pin] - stream1: Fuse, + stream1: St1, #[pin] - stream2: Fuse, + stream2: St2, + internal_state: InternalState, state: State, clos: Clos, } @@ -121,9 +153,10 @@ where State: Default, { assert_stream::(SelectWithStrategy { - stream1: stream1.fuse(), - stream2: stream2.fuse(), + stream1, + stream2, state: Default::default(), + internal_state: InternalState::Start, clos: which, }) } @@ -132,7 +165,7 @@ impl SelectWithStrategy { /// Acquires a reference to the underlying streams that this combinator is /// pulling from. pub fn get_ref(&self) -> (&St1, &St2) { - (self.stream1.get_ref(), self.stream2.get_ref()) + (&self.stream1, &self.stream2) } /// Acquires a mutable reference to the underlying streams that this @@ -141,7 +174,7 @@ impl SelectWithStrategy { /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. pub fn get_mut(&mut self) -> (&mut St1, &mut St2) { - (self.stream1.get_mut(), self.stream2.get_mut()) + (&mut self.stream1, &mut self.stream2) } /// Acquires a pinned mutable reference to the underlying streams that this @@ -151,7 +184,7 @@ impl SelectWithStrategy { /// stream which may otherwise confuse this combinator. pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) { let this = self.project(); - (this.stream1.get_pin_mut(), this.stream2.get_pin_mut()) + (this.stream1, this.stream2) } /// Consumes this combinator, returning the underlying streams. @@ -159,7 +192,7 @@ impl SelectWithStrategy { /// Note that this may discard intermediate state of this combinator, so /// care should be taken to avoid losing resources when this is called. pub fn into_inner(self) -> (St1, St2) { - (self.stream1.into_inner(), self.stream2.into_inner()) + (self.stream1, self.stream2) } } @@ -170,7 +203,10 @@ where Clos: FnMut(&mut State) -> PollNext, { fn is_terminated(&self) -> bool { - self.stream1.is_terminated() && self.stream2.is_terminated() + match self.internal_state { + InternalState::BothFinished => true, + _ => false, + } } } @@ -185,35 +221,60 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); - match (this.clos)(this.state) { - PollNext::Left => poll_inner(this.stream1, this.stream2, cx), - PollNext::Right => poll_inner(this.stream2, this.stream1, cx), + match this.internal_state { + InternalState::Start => match (this.clos)(this.state) { + PollNext::Left => { + match this.stream1.poll_next(cx) { + Poll::Ready(Some(item)) => return Poll::Ready(Some(item)), + Poll::Ready(None) => { + this.internal_state.finish(PollSide::Left); + } + Poll::Pending => (), + }; + match this.stream2.poll_next(cx) { + Poll::Ready(None) => { + this.internal_state.finish(PollSide::Right); + Poll::Ready(None) + } + a => a, + } + } + PollNext::Right => { + match this.stream2.poll_next(cx) { + Poll::Ready(Some(item)) => return Poll::Ready(Some(item)), + Poll::Ready(None) => { + this.internal_state.finish(PollSide::Right); + } + Poll::Pending => (), + }; + match this.stream1.poll_next(cx) { + Poll::Ready(None) => { + this.internal_state.finish(PollSide::Left); + Poll::Ready(None) + } + a => a, + } + } + }, + InternalState::LeftFinished => match this.stream2.poll_next(cx) { + Poll::Ready(None) => { + *this.internal_state = InternalState::BothFinished; + Poll::Ready(None) + } + a => a, + }, + InternalState::RightFinished => match this.stream1.poll_next(cx) { + Poll::Ready(None) => { + *this.internal_state = InternalState::BothFinished; + Poll::Ready(None) + } + a => a, + }, + InternalState::BothFinished => Poll::Ready(None), } } } -fn poll_inner( - a: Pin<&mut St1>, - b: Pin<&mut St2>, - cx: &mut Context<'_>, -) -> Poll> -where - St1: Stream, - St2: Stream, -{ - let a_done = match a.poll_next(cx) { - Poll::Ready(Some(item)) => return Poll::Ready(Some(item)), - Poll::Ready(None) => true, - Poll::Pending => false, - }; - - match b.poll_next(cx) { - Poll::Ready(Some(item)) => Poll::Ready(Some(item)), - Poll::Ready(None) if a_done => Poll::Ready(None), - Poll::Ready(None) | Poll::Pending => Poll::Pending, - } -} - impl fmt::Debug for SelectWithStrategy where St1: fmt::Debug, From 323927a9cef31bebcb3441350aac44ad1bdc9ad4 Mon Sep 17 00:00:00 2001 From: Owen Shepherd Date: Mon, 21 Mar 2022 12:12:22 +0000 Subject: [PATCH 2/5] Deduplicate logic from select_with_strategy poll_next branches --- .../src/stream/select_with_strategy.rs | 104 ++++++++++-------- 1 file changed, 59 insertions(+), 45 deletions(-) diff --git a/futures-util/src/stream/select_with_strategy.rs b/futures-util/src/stream/select_with_strategy.rs index 97417d3858..e173285b73 100644 --- a/futures-util/src/stream/select_with_strategy.rs +++ b/futures-util/src/stream/select_with_strategy.rs @@ -13,13 +13,6 @@ pub enum PollNext { Right, } -enum PollSide { - /// Poll the first stream. - Left, - /// Poll the second stream. - Right, -} - impl PollNext { /// Toggle the value and return the old one. #[must_use] @@ -33,6 +26,13 @@ impl PollNext { old } + + fn other(&self) -> PollNext { + match self { + PollNext::Left => PollNext::Right, + PollNext::Right => PollNext::Left, + } + } } impl Default for PollNext { @@ -49,16 +49,16 @@ enum InternalState { } impl InternalState { - fn finish(&mut self, ps: PollSide) { + fn finish(&mut self, ps: &PollNext) { match (&self, ps) { - (InternalState::Start, PollSide::Left) => { + (InternalState::Start, PollNext::Left) => { *self = InternalState::LeftFinished; } - (InternalState::Start, PollSide::Right) => { + (InternalState::Start, PollNext::Right) => { *self = InternalState::RightFinished; } - (InternalState::LeftFinished, PollSide::Right) - | (InternalState::RightFinished, PollSide::Left) => { + (InternalState::LeftFinished, PollNext::Right) + | (InternalState::RightFinished, PollNext::Left) => { *self = InternalState::BothFinished; } _ => {} @@ -69,6 +69,7 @@ impl InternalState { pin_project! { /// Stream for the [`select_with_strategy()`] function. See function docs for details. #[must_use = "streams do nothing unless polled"] + #[project = SelectWithStrategyProj] pub struct SelectWithStrategy { #[pin] stream1: St1, @@ -210,6 +211,49 @@ where } } +#[inline] +fn poll_side( + select: &mut SelectWithStrategyProj<'_, St1, St2, Clos, State>, + side: &PollNext, + cx: &mut Context<'_>, +) -> Poll> +where + St1: Stream, + St2: Stream, +{ + match side { + PollNext::Left => select.stream1.as_mut().poll_next(cx), + PollNext::Right => select.stream2.as_mut().poll_next(cx), + } +} + +#[inline] +fn poll_inner( + select: &mut SelectWithStrategyProj<'_, St1, St2, Clos, State>, + side: &PollNext, + cx: &mut Context<'_>, +) -> Poll> +where + St1: Stream, + St2: Stream, +{ + match poll_side(select, side, cx) { + Poll::Ready(Some(item)) => return Poll::Ready(Some(item)), + Poll::Ready(None) => { + select.internal_state.finish(side); + } + Poll::Pending => (), + }; + let other = side.other(); + match poll_side(select, &other, cx) { + Poll::Ready(None) => { + select.internal_state.finish(&other); + Poll::Ready(None) + } + a => a, + } +} + impl Stream for SelectWithStrategy where St1: Stream, @@ -219,42 +263,12 @@ where type Item = St1::Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); + let mut this = self.project(); match this.internal_state { InternalState::Start => match (this.clos)(this.state) { - PollNext::Left => { - match this.stream1.poll_next(cx) { - Poll::Ready(Some(item)) => return Poll::Ready(Some(item)), - Poll::Ready(None) => { - this.internal_state.finish(PollSide::Left); - } - Poll::Pending => (), - }; - match this.stream2.poll_next(cx) { - Poll::Ready(None) => { - this.internal_state.finish(PollSide::Right); - Poll::Ready(None) - } - a => a, - } - } - PollNext::Right => { - match this.stream2.poll_next(cx) { - Poll::Ready(Some(item)) => return Poll::Ready(Some(item)), - Poll::Ready(None) => { - this.internal_state.finish(PollSide::Right); - } - Poll::Pending => (), - }; - match this.stream1.poll_next(cx) { - Poll::Ready(None) => { - this.internal_state.finish(PollSide::Left); - Poll::Ready(None) - } - a => a, - } - } + PollNext::Left => poll_inner(&mut this, &PollNext::Left, cx), + PollNext::Right => poll_inner(&mut this, &PollNext::Right, cx), }, InternalState::LeftFinished => match this.stream2.poll_next(cx) { Poll::Ready(None) => { From 9c9e8dff6c193623242a766d793a8ea7b7f43239 Mon Sep 17 00:00:00 2001 From: Owen Shepherd Date: Thu, 24 Mar 2022 22:09:24 +0000 Subject: [PATCH 3/5] Use PollNext::other() in PollNext::toggle() --- futures-util/src/stream/select_with_strategy.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/futures-util/src/stream/select_with_strategy.rs b/futures-util/src/stream/select_with_strategy.rs index e173285b73..ca7b5dad12 100644 --- a/futures-util/src/stream/select_with_strategy.rs +++ b/futures-util/src/stream/select_with_strategy.rs @@ -18,12 +18,7 @@ impl PollNext { #[must_use] pub fn toggle(&mut self) -> Self { let old = *self; - - match self { - PollNext::Left => *self = PollNext::Right, - PollNext::Right => *self = PollNext::Left, - } - + *self = self.other(); old } From 567ca5bd672405acda6a26fabe9406641222cde2 Mon Sep 17 00:00:00 2001 From: Owen Shepherd Date: Fri, 25 Mar 2022 12:50:27 +0000 Subject: [PATCH 4/5] Update pin-project-lite minimum bound to 0.2.6 --- futures-util/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/futures-util/Cargo.toml b/futures-util/Cargo.toml index 1bcc4c083e..d95dbc60b6 100644 --- a/futures-util/Cargo.toml +++ b/futures-util/Cargo.toml @@ -41,7 +41,7 @@ memchr = { version = "2.2", optional = true } futures_01 = { version = "0.1.25", optional = true, package = "futures" } tokio-io = { version = "0.1.9", optional = true } pin-utils = "0.1.0" -pin-project-lite = "0.2.4" +pin-project-lite = "0.2.6" [dev-dependencies] futures = { path = "../futures", features = ["async-await", "thread-pool"] } From 9ef954f00ed7d77f44d3ae20c005478bc322767c Mon Sep 17 00:00:00 2001 From: Owen Shepherd Date: Fri, 25 Mar 2022 15:51:13 +0000 Subject: [PATCH 5/5] Stream selection passes PollNext by value internally --- .../src/stream/select_with_strategy.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/futures-util/src/stream/select_with_strategy.rs b/futures-util/src/stream/select_with_strategy.rs index ca7b5dad12..37dc5fe338 100644 --- a/futures-util/src/stream/select_with_strategy.rs +++ b/futures-util/src/stream/select_with_strategy.rs @@ -44,7 +44,7 @@ enum InternalState { } impl InternalState { - fn finish(&mut self, ps: &PollNext) { + fn finish(&mut self, ps: PollNext) { match (&self, ps) { (InternalState::Start, PollNext::Left) => { *self = InternalState::LeftFinished; @@ -209,7 +209,7 @@ where #[inline] fn poll_side( select: &mut SelectWithStrategyProj<'_, St1, St2, Clos, State>, - side: &PollNext, + side: PollNext, cx: &mut Context<'_>, ) -> Poll> where @@ -225,7 +225,7 @@ where #[inline] fn poll_inner( select: &mut SelectWithStrategyProj<'_, St1, St2, Clos, State>, - side: &PollNext, + side: PollNext, cx: &mut Context<'_>, ) -> Poll> where @@ -240,9 +240,9 @@ where Poll::Pending => (), }; let other = side.other(); - match poll_side(select, &other, cx) { + match poll_side(select, other, cx) { Poll::Ready(None) => { - select.internal_state.finish(&other); + select.internal_state.finish(other); Poll::Ready(None) } a => a, @@ -261,10 +261,10 @@ where let mut this = self.project(); match this.internal_state { - InternalState::Start => match (this.clos)(this.state) { - PollNext::Left => poll_inner(&mut this, &PollNext::Left, cx), - PollNext::Right => poll_inner(&mut this, &PollNext::Right, cx), - }, + InternalState::Start => { + let next_side = (this.clos)(this.state); + poll_inner(&mut this, next_side, cx) + } InternalState::LeftFinished => match this.stream2.poll_next(cx) { Poll::Ready(None) => { *this.internal_state = InternalState::BothFinished;