diff --git a/futures-util/src/stream/stream/flatten_unordered.rs b/futures-util/src/stream/stream/flatten_unordered.rs index 2cdd49c4ed..6361565f2b 100644 --- a/futures-util/src/stream/stream/flatten_unordered.rs +++ b/futures-util/src/stream/stream/flatten_unordered.rs @@ -1,6 +1,7 @@ use alloc::sync::Arc; use core::{ cell::UnsafeCell, + convert::identity, fmt, num::NonZeroUsize, pin::Pin, @@ -67,7 +68,8 @@ impl SharedPollState { fn start_polling( &self, ) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> { - self.state + let value = self + .state .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| { if value & WAKING_ALL == NONE { Some(POLLING) @@ -75,12 +77,10 @@ impl SharedPollState { None } }) - .ok() - .map(|value| { - let bomb = PollStateBomb::new(self, SharedPollState::reset); + .ok()?; + let bomb = PollStateBomb::new(self, SharedPollState::reset); - (value, bomb) - }) + Some((value, bomb)) } /// Starts the waking process and performs bitwise or with the given value. @@ -88,20 +88,29 @@ impl SharedPollState { &self, to_poll: u8, waking: u8, - ) -> (u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>) { + ) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> { let value = self .state .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| { + let next_value = value | to_poll; + if value & (WOKEN | POLLING) == NONE { - Some(value | to_poll | waking) + Some(next_value | waking) + } else if next_value != value { + Some(next_value) } else { - Some(value | to_poll) + None } }) - .unwrap(); - let bomb = PollStateBomb::new(self, move |state| state.stop_waking(waking)); + .ok()?; + + if value & (WOKEN | POLLING) == NONE { + let bomb = PollStateBomb::new(self, move |state| state.stop_waking(waking)); - (value, bomb) + Some((value, bomb)) + } else { + None + } } /// Sets current state to @@ -112,16 +121,18 @@ impl SharedPollState { /// * Wakers called during the `POLLING` phase won't propagate their calls /// * `POLLING` phase can't start if some of the wakers are active /// So no wrapped waker can touch the inner waker's cell, it's safe to poll again. - fn stop_polling(&self, mut to_poll: u8, will_be_woken: bool) -> u8 { + fn stop_polling(&self, to_poll: u8, will_be_woken: bool) -> u8 { self.state .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |mut value| { + let mut next_value = to_poll; + value &= NEED_TO_POLL_ALL; if value != NONE || will_be_woken { - to_poll |= WOKEN; + next_value |= WOKEN; } - to_poll |= value; + next_value |= value; - Some(to_poll & !POLLING & !WAKING_ALL) + Some(next_value & !POLLING & !WAKING_ALL) }) .unwrap() } @@ -129,13 +140,23 @@ impl SharedPollState { /// Toggles state to non-waking, allowing to start polling. fn stop_waking(&self, waking: u8) -> u8 { self.state - .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| Some(value & !waking | WOKEN)) - .unwrap() + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| { + let next_value = value & !waking; + + if value & WAKING_ALL == waking { + Some(next_value | WOKEN) + } else if next_value != value { + Some(next_value) + } else { + None + } + }) + .unwrap_or_else(identity) } /// Resets current state allowing to poll the stream and wake up wakers. fn reset(&self) -> u8 { - self.state.swap(NEED_TO_POLL_ALL, Ordering::SeqCst) + self.state.swap(NEED_TO_POLL_ALL, Ordering::AcqRel) } } @@ -195,8 +216,9 @@ impl InnerWaker { waker(self_arc.clone()) } - /// Flags state that waking is started for the waker with the given value. - fn start_waking(&self) -> (u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>) { + /// Attempts to start the waking process for the waker with the given value. + /// If succeeded, then the stream isn't yet woken and not being polled at the moment. + fn start_waking(&self) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> { self.poll_state.start_waking(self.need_to_poll, self.waking_state()) } @@ -208,14 +230,7 @@ impl InnerWaker { impl ArcWake for InnerWaker { fn wake_by_ref(self_arc: &Arc) { - let (poll_state_value, state_bomb) = self_arc.start_waking(); - - // Only call waker if the stream - // - isn't woken already - // - isn't being polled at the moment because of safety reasons - // - // Waker will be called at the end of polling if state was changed - if poll_state_value & (WOKEN | POLLING) == NONE { + if let Some((_, state_bomb)) = self_arc.start_waking() { // Safety: now state is not `POLLING` let waker_opt = unsafe { self_arc.inner_waker.get().as_ref().unwrap() }; @@ -223,18 +238,16 @@ impl ArcWake for InnerWaker { // Stop waking to allow polling stream let poll_state_value = state_bomb.fire().unwrap(); - // Check if the stream isn't woken yet - if poll_state_value & WOKEN == NONE { + // Here we want to call waker only if stream isn't woken yet and + // also to optimize the case when two wakers are called at the same time. + // + // In this case the best strategy will be to propagate only the latest waker's awake, + // and then poll both entities in a single `poll_next` call + if poll_state_value & (WOKEN | WAKING_ALL) == self_arc.waking_state() { // Wake up inner waker inner_waker.wake(); } } - } else { - // At the end of polling state will become `!WAKING_ALL` if stream is being polled - // - // And it's already non-waking if it's woken - - state_bomb.deactivate(); } } }