From c6a935772573904dcb6a97ca4b917a04b3165462 Mon Sep 17 00:00:00 2001 From: olegnn Date: Thu, 27 Jan 2022 01:30:30 +0300 Subject: [PATCH] Add `woken` state --- .../src/stream/stream/flatten_unordered.rs | 87 +++++++++++++------ 1 file changed, 59 insertions(+), 28 deletions(-) diff --git a/futures-util/src/stream/stream/flatten_unordered.rs b/futures-util/src/stream/stream/flatten_unordered.rs index 428ff9b9fa..ed34595c63 100644 --- a/futures-util/src/stream/stream/flatten_unordered.rs +++ b/futures-util/src/stream/stream/flatten_unordered.rs @@ -46,6 +46,9 @@ const WAKING_STREAM: u8 = 0b10000; /// The base stream and inner streams are being woken at the moment. const WAKING_ALL: u8 = WAKING_STREAM | WAKING_INNER_STREAMS; +/// The stream was waked and will be polled. +const WOKEN: u8 = 0b100000; + /// Determines what needs to be polled, and is stream being polled at the /// moment or not. #[derive(Clone, Debug)] @@ -74,7 +77,7 @@ impl SharedPollState { }) .ok() .map(|value| { - let bomb = PollStateBomb::new(self, |state| state.stop_polling(NEED_TO_POLL_ALL)); + let bomb = PollStateBomb::new(self, SharedPollState::reset); (value, bomb) }) @@ -86,28 +89,52 @@ impl SharedPollState { to_poll: u8, waking: u8, ) -> (u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>) { - let value = self.state.fetch_or(to_poll | waking, Ordering::SeqCst); + let value = self + .state + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| { + if value & WOKEN == NONE { + Some(value | to_poll | waking) + } else { + Some(value | to_poll) + } + }) + .unwrap(); let bomb = PollStateBomb::new(self, move |state| state.stop_waking(waking)); (value, bomb) } - /// Sets current state to `!POLLING` allowing to use wakers and `!WALING_ALL` as - /// - Wakers called during the `POLLING` phase won't propagate their calls - /// - `POLLING` phase can't start if some of the wakers are active - /// - /// So no wrapped waker can touch the inner waker's cell, it's safe to poll again. - fn stop_polling(&self, to_poll: u8) -> u8 { + /// Sets current state to + /// - `!POLLING` allowing to use wakers + /// - `WOKEN` if the state was changed during `POLLING` phase as waker will be called, + /// or `will_be_woken` flag supplied + /// - `!WAKING_ALL` as + /// * Wakers called during the `POLLING` phase won't propagate their calls + /// * `POLLING` phase can't start if some of the wakers are active + /// So no wrapped waker can touch the inner waker's cell, it's safe to poll again. + fn stop_polling(&self, mut to_poll: u8, will_be_woken: bool) -> u8 { self.state .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| { - Some((value | to_poll) & !POLLING & !WAKING_ALL) + if value & NEED_TO_POLL_ALL != NONE || will_be_woken { + to_poll |= WOKEN; + } + to_poll |= value; + + Some(to_poll & !POLLING & !WAKING_ALL) }) .unwrap() } /// Toggles state to non-waking, allowing to start polling. fn stop_waking(&self, waking: u8) -> u8 { - self.state.fetch_and(!waking, Ordering::SeqCst) + self.state + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| Some(value & !waking | WOKEN)) + .unwrap() + } + + /// Resets current state allowing to poll the stream and wake up wakers. + fn reset(&self) -> u8 { + self.state.swap(NEED_TO_POLL_ALL, Ordering::SeqCst) } } @@ -182,10 +209,12 @@ impl ArcWake for InnerWaker { fn wake_by_ref(self_arc: &Arc) { let (poll_state_value, state_bomb) = self_arc.start_waking(); - // Only call waker if stream isn't being polled because of safety reasons + // Only call waker if the stream + // - isn't woken already + // - isn't being polled at the moment because of safety reasons // // Waker will be called at the end of polling if state was changed - if poll_state_value & POLLING == NONE { + if poll_state_value & (WOKEN | POLLING) == NONE { // Safety: now state is not `POLLING` let waker_opt = unsafe { self_arc.inner_waker.get().as_ref().unwrap() }; @@ -193,19 +222,17 @@ impl ArcWake for InnerWaker { // Stop waking to allow polling stream let poll_state_value = state_bomb.fire().unwrap(); - // Here we want to optimize the case when two wakers are called at the same time - // - // In this case the best strategy will be to propagate only the latest waker's awake, - // and then poll both entities in a single `poll_next` call - - // Check if this is the latest waker being waking - if poll_state_value & WAKING_ALL == self_arc.waking_state() { + // Check if the stream isn't woken yet + if poll_state_value & WOKEN == NONE { // Wake up inner waker inner_waker.wake(); } } } else { - // At the end of polling state will become `!WAKING_ALL` + // At the end of polling state will become `!WAKING_ALL` if stream is being polled + // + // And it's already non-waking if it's woken + state_bomb.deactivate(); } } @@ -422,20 +449,24 @@ where // We didn't have any `poll_next` panic, so it's time to deactivate the bomb state_bomb.deactivate(); + + let mut force_wake = + // we need to poll the stream and didn't reach the limit yet + need_to_poll_next & NEED_TO_POLL_STREAM != NONE && !this.is_exceeded_limit() + // or we need to poll inner streams again + || need_to_poll_next & NEED_TO_POLL_INNER_STREAMS != NONE; + // Stop polling and swap the latest state - poll_state_value = this.poll_state.stop_polling(need_to_poll_next); + poll_state_value = this.poll_state.stop_polling(need_to_poll_next, force_wake); + // If state was changed during `POLLING` phase, need to manually call a waker + force_wake |= poll_state_value & NEED_TO_POLL_ALL != NONE; + let is_done = *this.is_stream_done && this.inner_streams.is_empty(); if next_item.is_some() || is_done { Poll::Ready(next_item) } else { - // We need to call the waker if state was changed during the polling phase - if poll_state_value & NEED_TO_POLL_ALL != NONE - // or we need to poll the stream and didn't reach the limit yet - || need_to_poll_next & NEED_TO_POLL_STREAM != NONE && !this.is_exceeded_limit() - // or we need to poll inner streams again - || need_to_poll_next & NEED_TO_POLL_INNER_STREAMS != NONE - { + if force_wake { cx.waker().wake_by_ref(); }