diff --git a/futures-util/src/stream/stream/flatten_unordered.rs b/futures-util/src/stream/stream/flatten_unordered.rs index 6361565f2b..07f971c55a 100644 --- a/futures-util/src/stream/stream/flatten_unordered.rs +++ b/futures-util/src/stream/stream/flatten_unordered.rs @@ -92,11 +92,17 @@ impl SharedPollState { let value = self .state .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| { - let next_value = value | to_poll; - + // Waking process for this waker already started + if value & waking != NONE { + return None; + } + let mut next_value = value | to_poll; + // Only start the waking process if we're not in the polling phase and the stream isn't woken already if value & (WOKEN | POLLING) == NONE { - Some(next_value | waking) - } else if next_value != value { + next_value |= waking; + } + + if next_value != value { Some(next_value) } else { None @@ -141,11 +147,13 @@ impl SharedPollState { fn stop_waking(&self, waking: u8) -> u8 { self.state .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| { - let next_value = value & !waking; - + let mut next_value = value & !waking; + // Waker will be called only if the current waking state is the same as the specified waker state if value & WAKING_ALL == waking { - Some(next_value | WOKEN) - } else if next_value != value { + next_value |= WOKEN; + } + + if next_value != value { Some(next_value) } else { None