diff --git a/futures-util/src/stream/stream/flat_map_unordered.rs b/futures-util/src/stream/stream/flat_map_unordered.rs index e86b180adb..4638922b0d 100644 --- a/futures-util/src/stream/stream/flat_map_unordered.rs +++ b/futures-util/src/stream/stream/flat_map_unordered.rs @@ -85,8 +85,8 @@ impl ArcWake for PollWaker { } /// Future which contains optional stream. If it's `Some`, it will attempt -/// to call `poll_next` on it, returning `Some((item, stream))` in case of -/// `Poll::Ready(Some(...))` or `None` in case of `Poll::Ready(None)`. +/// to call `poll_next` on it, returning `Some((item, next_item_fut))` in +/// case of `Poll::Ready(Some(...))` or `None` in case of `Poll::Ready(None)`. /// If `poll_next` will return `Poll::Pending`, it will be forwared to /// the future, and current task will be notified by waker. #[must_use = "futures do nothing unless you `.await` or poll them"] @@ -96,12 +96,19 @@ struct StreamFut { impl StreamFut { unsafe_pinned!(stream: Option); + + /// Constructs new `StreamFut` using given `stream`. + fn new(stream: St) -> Self { + Self { + stream: stream.into(), + } + } } impl Unpin for StreamFut {} impl Future for StreamFut { - type Output = Option<(St::Item, St)>; + type Output = Option<(St::Item, StreamFut)>; fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll { let item = if let Some(stream) = self.as_mut().stream().as_pin_mut() { @@ -111,9 +118,12 @@ impl Future for StreamFut { }; Poll::Ready(item.map(|item| { - (item, unsafe { - self.get_unchecked_mut().stream.take().unwrap() - }) + ( + item, + StreamFut { + stream: unsafe { self.get_unchecked_mut().stream.take() }, + }, + ) })) } } @@ -264,16 +274,17 @@ where fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { let mut poll_state_value = self.as_mut().poll_state().begin_polling(); + println!("STATE {}", poll_state_value); + let mut next_item = None; let mut need_to_poll_next = NONE; let mut polling_with_two_wakers = poll_state_value & NEED_TO_POLL == NEED_TO_POLL && self.not_exceeded_limit(); - let mut polled_stream = false; - let mut polled_futures = false; + let mut stream_will_be_woken = false; + let mut futures_will_be_woken = false; if poll_state_value & NEED_TO_POLL_STREAM != NONE { if self.not_exceeded_limit() { - polled_stream = true; match if polling_with_two_wakers { let waker = self.create_poll_stream_waker(ctx); let mut ctx = Context::from_waker(&waker); @@ -282,9 +293,7 @@ where self.as_mut().stream().poll_next(ctx) } { Poll::Ready(Some(inner_stream)) => { - self.as_mut().futures().push(StreamFut { - stream: Some(inner_stream), - }); + self.as_mut().futures().push(StreamFut::new(inner_stream)); need_to_poll_next |= NEED_TO_POLL_STREAM; // Polling futures in current iteration with the same context // is ok because we already received `Poll::Ready` from @@ -300,6 +309,7 @@ where polling_with_two_wakers = false; } Poll::Pending => { + stream_will_be_woken = true; if !polling_with_two_wakers { need_to_poll_next |= NEED_TO_POLL_STREAM; } @@ -311,7 +321,6 @@ where } if poll_state_value & NEED_TO_POLL_FUTURES != NONE { - polled_futures = true; match if polling_with_two_wakers { let waker = self.create_poll_futures_waker(ctx); let mut ctx = Context::from_waker(&waker); @@ -319,10 +328,8 @@ where } else { self.as_mut().futures().poll_next(ctx) } { - Poll::Ready(Some(Some((item, stream)))) => { - self.as_mut().futures().push(StreamFut { - stream: Some(stream), - }); + Poll::Ready(Some(Some((item, next_item_fut)))) => { + self.as_mut().futures().push(next_item_fut); next_item = Some(item); need_to_poll_next |= NEED_TO_POLL_FUTURES; } @@ -330,6 +337,7 @@ where need_to_poll_next |= NEED_TO_POLL_FUTURES; } Poll::Pending => { + futures_will_be_woken = true; if !polling_with_two_wakers { need_to_poll_next |= NEED_TO_POLL_FUTURES; } @@ -342,16 +350,12 @@ where let poll_state_value = self.as_mut().poll_state().end_polling(need_to_poll_next); - if poll_state_value & NEED_TO_POLL != NONE { - if !polling_with_two_wakers { - if poll_state_value & NEED_TO_POLL_FUTURES != NONE && !polled_futures - || poll_state_value & NEED_TO_POLL_STREAM != NONE && !polled_stream - { - ctx.waker().wake_by_ref(); - } - } else { - ctx.waker().wake_by_ref(); - } + if poll_state_value & NEED_TO_POLL != NONE + && (polling_with_two_wakers + || (poll_state_value & NEED_TO_POLL_FUTURES != NONE && !futures_will_be_woken + || poll_state_value & NEED_TO_POLL_STREAM != NONE && !stream_will_be_woken)) + { + ctx.waker().wake_by_ref(); } if self.futures.is_empty() && self.is_stream_done || next_item.is_some() {