Skip to content

Commit

Permalink
Tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
olegnn committed Jan 27, 2022
1 parent 69f60cc commit a50c9e4
Showing 1 changed file with 50 additions and 37 deletions.
87 changes: 50 additions & 37 deletions futures-util/src/stream/stream/flatten_unordered.rs
@@ -1,6 +1,7 @@
use alloc::sync::Arc;
use core::{
cell::UnsafeCell,
convert::identity,
fmt,
num::NonZeroUsize,
pin::Pin,
Expand Down Expand Up @@ -67,41 +68,49 @@ impl SharedPollState {
fn start_polling(
&self,
) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> {
self.state
let value = self
.state
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
if value & WAKING_ALL == NONE {
Some(POLLING)
} else {
None
}
})
.ok()
.map(|value| {
let bomb = PollStateBomb::new(self, SharedPollState::reset);
.ok()?;
let bomb = PollStateBomb::new(self, SharedPollState::reset);

(value, bomb)
})
Some((value, bomb))
}

/// Starts the waking process and performs bitwise or with the given value.
fn start_waking(
&self,
to_poll: u8,
waking: u8,
) -> (u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>) {
) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> {
let value = self
.state
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
let next_value = value | to_poll;

if value & (WOKEN | POLLING) == NONE {
Some(value | to_poll | waking)
Some(next_value | waking)
} else if next_value != value {
Some(next_value)
} else {
Some(value | to_poll)
None
}
})
.unwrap();
let bomb = PollStateBomb::new(self, move |state| state.stop_waking(waking));
.ok()?;

if value & (WOKEN | POLLING) == NONE {
let bomb = PollStateBomb::new(self, move |state| state.stop_waking(waking));

(value, bomb)
Some((value, bomb))
} else {
None
}
}

/// Sets current state to
Expand All @@ -112,30 +121,42 @@ impl SharedPollState {
/// * Wakers called during the `POLLING` phase won't propagate their calls
/// * `POLLING` phase can't start if some of the wakers are active
/// So no wrapped waker can touch the inner waker's cell, it's safe to poll again.
fn stop_polling(&self, mut to_poll: u8, will_be_woken: bool) -> u8 {
fn stop_polling(&self, to_poll: u8, will_be_woken: bool) -> u8 {
self.state
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |mut value| {
let mut next_value = to_poll;

value &= NEED_TO_POLL_ALL;
if value != NONE || will_be_woken {
to_poll |= WOKEN;
next_value |= WOKEN;
}
to_poll |= value;
next_value |= value;

Some(to_poll & !POLLING & !WAKING_ALL)
Some(next_value & !POLLING & !WAKING_ALL)
})
.unwrap()
}

/// Toggles state to non-waking, allowing to start polling.
fn stop_waking(&self, waking: u8) -> u8 {
self.state
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| Some(value & !waking | WOKEN))
.unwrap()
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
let next_value = value & !waking;

if value & WAKING_ALL == waking {
Some(next_value | WOKEN)
} else if next_value != value {
Some(next_value)
} else {
None
}
})
.unwrap_or_else(identity)
}

/// Resets current state allowing to poll the stream and wake up wakers.
fn reset(&self) -> u8 {
self.state.swap(NEED_TO_POLL_ALL, Ordering::SeqCst)
self.state.swap(NEED_TO_POLL_ALL, Ordering::AcqRel)
}
}

Expand Down Expand Up @@ -195,8 +216,9 @@ impl InnerWaker {
waker(self_arc.clone())
}

/// Flags state that waking is started for the waker with the given value.
fn start_waking(&self) -> (u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>) {
/// Attempts to start the waking process for the waker with the given value.
/// If succeeded, then the stream isn't yet woken and not being polled at the moment.
fn start_waking(&self) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> {
self.poll_state.start_waking(self.need_to_poll, self.waking_state())
}

Expand All @@ -208,33 +230,24 @@ impl InnerWaker {

impl ArcWake for InnerWaker {
fn wake_by_ref(self_arc: &Arc<Self>) {
let (poll_state_value, state_bomb) = self_arc.start_waking();

// Only call waker if the stream
// - isn't woken already
// - isn't being polled at the moment because of safety reasons
//
// Waker will be called at the end of polling if state was changed
if poll_state_value & (WOKEN | POLLING) == NONE {
if let Some((_, state_bomb)) = self_arc.start_waking() {
// Safety: now state is not `POLLING`
let waker_opt = unsafe { self_arc.inner_waker.get().as_ref().unwrap() };

if let Some(inner_waker) = waker_opt.clone() {
// Stop waking to allow polling stream
let poll_state_value = state_bomb.fire().unwrap();

// Check if the stream isn't woken yet
if poll_state_value & WOKEN == NONE {
// Here we want to call waker only if stream isn't woken yet and
// also to optimize the case when two wakers are called at the same time.
//
// In this case the best strategy will be to propagate only the latest waker's awake,
// and then poll both entities in a single `poll_next` call
if poll_state_value & (WOKEN | WAKING_ALL) == self_arc.waking_state() {
// Wake up inner waker
inner_waker.wake();
}
}
} else {
// At the end of polling state will become `!WAKING_ALL` if stream is being polled
//
// And it's already non-waking if it's woken

state_bomb.deactivate();
}
}
}
Expand Down

0 comments on commit a50c9e4

Please sign in to comment.