From f2ddcfdd6a5c4b8096bcbe4e93d79ea41ca68d8d Mon Sep 17 00:00:00 2001 From: olegnn Date: Thu, 28 May 2020 15:07:54 +0300 Subject: [PATCH] Updated tests + stronger ordering --- .../src/stream/stream/flatten_unordered.rs | 16 ++--- futures/tests/stream.rs | 66 ++++++++++++------- 2 files changed, 50 insertions(+), 32 deletions(-) diff --git a/futures-util/src/stream/stream/flatten_unordered.rs b/futures-util/src/stream/stream/flatten_unordered.rs index 83d3feb0cf..cd69c7694f 100644 --- a/futures-util/src/stream/stream/flatten_unordered.rs +++ b/futures-util/src/stream/stream/flatten_unordered.rs @@ -19,7 +19,7 @@ use pin_project::pin_project; const NONE: u8 = 0; /// Indicates that `futures` need to be polled. -const NEED_TO_POLL_FUTURES: u8 = 0b1; +const NEED_TO_POLL_FUTURES: u8 = 1; /// Indicates that `stream` needs to be polled. const NEED_TO_POLL_STREAM: u8 = 0b10; @@ -30,7 +30,7 @@ const NEED_TO_POLL: u8 = NEED_TO_POLL_FUTURES | NEED_TO_POLL_STREAM; /// Indicates that current stream is polled at the moment. const POLLING: u8 = 0b100; -// Indicates that we already called one of wakers. +// Indicates that it already called one of wakers. const WOKEN: u8 = 0b1000; /// State which used to determine what needs to be polled, and are we polling @@ -50,20 +50,20 @@ impl SharedPollState { /// Swaps state with `POLLING`, returning previous state. fn begin_polling(&self) -> u8 { - self.state.swap(POLLING, Ordering::AcqRel) + self.state.swap(POLLING, Ordering::SeqCst) } /// Performs bitwise or with `to_poll` and given state, returning /// previous state. fn set_or(&self, to_poll: u8) -> u8 { - self.state.fetch_or(to_poll, Ordering::AcqRel) + self.state.fetch_or(to_poll, Ordering::SeqCst) } /// Performs bitwise or with `to_poll` and current state, stores result /// with non-`POLLING` state, and returns disjunction result. - fn end_polling(&self, to_poll: u8) -> u8 { - let to_poll = to_poll | self.state.load(Ordering::Acquire); - self.state.store(to_poll & !POLLING & !WOKEN, Ordering::Release); + fn end_polling(&self, mut to_poll: u8) -> u8 { + to_poll |= self.state.swap(!POLLING & !WOKEN, Ordering::SeqCst); + self.state.fetch_and(to_poll & !POLLING & !WOKEN, Ordering::SeqCst); to_poll } } @@ -319,7 +319,7 @@ where let poll_state_value = this.poll_state.end_polling(need_to_poll_next); - let is_done = this.futures.is_empty() && *this.is_stream_done; + let is_done = *this.is_stream_done && this.futures.is_empty(); if !is_done && poll_state_value & WOKEN == NONE && poll_state_value & NEED_TO_POLL != NONE && (polling_with_two_wakers diff --git a/futures/tests/stream.rs b/futures/tests/stream.rs index 1e04450008..d3d755784b 100644 --- a/futures/tests/stream.rs +++ b/futures/tests/stream.rs @@ -87,20 +87,16 @@ fn flatten_unordered() { let sleep_time = Duration::from_millis(*self.data.last().unwrap_or(&0) as u64); thread::spawn(move || { thread::sleep(sleep_time); - woken.swap(true, Ordering::Relaxed); + woken.swap(true, Ordering::SeqCst); waker.wake_by_ref(); }); } else { - self.woken.swap(true, Ordering::Relaxed); + self.woken.swap(true, Ordering::SeqCst); ctx.waker().wake_by_ref(); } self.polled = true; Poll::Pending } else { - assert!( - self.woken.swap(false, Ordering::AcqRel), - "Inner stream polled before wake!" - ); self.polled = false; Poll::Ready(self.data.pop()) } @@ -126,27 +122,25 @@ fn flatten_unordered() { let sleep_time = Duration::from_millis(self.base as u64); thread::spawn(move || { thread::sleep(sleep_time); - woken.swap(true, Ordering::Relaxed); + woken.swap(true, Ordering::SeqCst); waker.wake_by_ref(); }); } else { - self.woken.swap(true, Ordering::Relaxed); + self.woken.swap(true, Ordering::SeqCst); ctx.waker().wake_by_ref(); } Poll::Pending } else { assert!( - self.woken.swap(false, Ordering::AcqRel), - "Stream polled before wake!" + self.woken.swap(false, Ordering::SeqCst), + format!("Stream polled before wake! {}", self.base) ); + let data: Vec<_> = (0..6).into_iter().map(|v| v + self.base * 6).collect(); self.base += 1; self.polled = false; Poll::Ready(Some(DataStream { polled: false, - data: vec![9, 8, 7, 6, 5] - .into_iter() - .map(|v| v * self.base) - .collect(), + data, wake_immediately: self.wake_immediately && self.base % 2 == 0, woken: Arc::new(AtomicBool::new(false)), })) @@ -156,9 +150,28 @@ fn flatten_unordered() { // concurrent tests block_on(async { - let fm_unordered = Interchanger { + let mut fl_unordered = Interchanger { + polled: false, + base: 0, + woken: Arc::new(AtomicBool::new(false)), + wake_immediately: false, + } + .take(10) + .map(|s| s.map(identity)) + .flatten() + .collect::>() + .await; + + fl_unordered.sort(); + + assert_eq!(fl_unordered, (0..60).collect::>()); + }); + + // concurrent tests + block_on(async { + let mut fm_unordered = Interchanger { polled: false, - base: 1, + base: 0, woken: Arc::new(AtomicBool::new(false)), wake_immediately: false, } @@ -167,7 +180,9 @@ fn flatten_unordered() { .collect::>() .await; - assert_eq!(fm_unordered.len(), 50); + fm_unordered.sort(); + + assert_eq!(fm_unordered, (0..60).collect::>()); }); // basic behaviour @@ -209,9 +224,9 @@ fn flatten_unordered() { // wake up immmediately block_on(async { - let fl_unordered = Interchanger { + let mut fl_unordered = Interchanger { polled: false, - base: 1, + base: 0, woken: Arc::new(AtomicBool::new(false)), wake_immediately: true, } @@ -221,14 +236,16 @@ fn flatten_unordered() { .collect::>() .await; - assert_eq!(fl_unordered.len(), 50); + fl_unordered.sort(); + + assert_eq!(fl_unordered, (0..60).collect::>()); }); // wake up immmediately block_on(async { - let fm_unordered = Interchanger { + let mut fm_unordered = Interchanger { polled: false, - base: 1, + base: 0, woken: Arc::new(AtomicBool::new(false)), wake_immediately: true, } @@ -237,7 +254,9 @@ fn flatten_unordered() { .collect::>() .await; - assert_eq!(fm_unordered.len(), 50); + fm_unordered.sort(); + + assert_eq!(fm_unordered, (0..60).collect::>()); }); } @@ -310,7 +329,6 @@ fn take_until() { }); } -#[cfg(feature = "executor")] // executor:: #[test] #[should_panic] fn ready_chunks_panic_on_cap_zero() {