Skip to content

Commit

Permalink
FlattenUnordered: improve wakers behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
olegnn committed Feb 6, 2022
1 parent 10a5dd7 commit c74e9a0
Showing 1 changed file with 14 additions and 14 deletions.
28 changes: 14 additions & 14 deletions futures-util/src/stream/stream/flatten_unordered.rs
Expand Up @@ -92,15 +92,17 @@ impl SharedPollState {
let value = self
.state
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
let next_value = value | to_poll;

// Waking process for this waker already started
if value & waking != NONE {
return None;
}
let mut next_value = value | to_poll;
// Only start the waking process if we're not in the polling phase and the stream isn't woken already
if value & (WOKEN | POLLING) == NONE {
Some(next_value | waking)
} else if next_value != value {
Some(next_value)
} else {
None
next_value |= waking;
}

(next_value != value).then(|| next_value)
})
.ok()?;

Expand Down Expand Up @@ -141,15 +143,13 @@ impl SharedPollState {
fn stop_waking(&self, waking: u8) -> u8 {
self.state
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
let next_value = value & !waking;

let mut next_value = value & !waking;
// Waker will be called only if the current waking state is the same as the specified waker state
if value & WAKING_ALL == waking {
Some(next_value | WOKEN)
} else if next_value != value {
Some(next_value)
} else {
None
next_value |= WOKEN;
}

(next_value != value).then(|| next_value)
})
.unwrap_or_else(identity)
}
Expand Down

0 comments on commit c74e9a0

Please sign in to comment.