Skip to content

Commit

Permalink
Updated tests + stronger ordering
Browse files Browse the repository at this point in the history
  • Loading branch information
olegnn committed May 28, 2020
1 parent 6adcfee commit 2ca3e96
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 29 deletions.
16 changes: 8 additions & 8 deletions futures-util/src/stream/stream/flatten_unordered.rs
Expand Up @@ -19,7 +19,7 @@ use pin_project::pin_project;
const NONE: u8 = 0;

/// Indicates that `futures` need to be polled.
const NEED_TO_POLL_FUTURES: u8 = 0b1;
const NEED_TO_POLL_FUTURES: u8 = 1;

/// Indicates that `stream` needs to be polled.
const NEED_TO_POLL_STREAM: u8 = 0b10;
Expand All @@ -30,7 +30,7 @@ const NEED_TO_POLL: u8 = NEED_TO_POLL_FUTURES | NEED_TO_POLL_STREAM;
/// Indicates that current stream is polled at the moment.
const POLLING: u8 = 0b100;

// Indicates that we already called one of wakers.
// Indicates that it already called one of wakers.
const WOKEN: u8 = 0b1000;

/// State which used to determine what needs to be polled, and are we polling
Expand All @@ -50,20 +50,20 @@ impl SharedPollState {

/// Swaps state with `POLLING`, returning previous state.
fn begin_polling(&self) -> u8 {
self.state.swap(POLLING, Ordering::AcqRel)
self.state.swap(POLLING, Ordering::SeqCst)
}

/// Performs bitwise or with `to_poll` and given state, returning
/// previous state.
fn set_or(&self, to_poll: u8) -> u8 {
self.state.fetch_or(to_poll, Ordering::AcqRel)
self.state.fetch_or(to_poll, Ordering::SeqCst)
}

/// Performs bitwise or with `to_poll` and current state, stores result
/// with non-`POLLING` state, and returns disjunction result.
fn end_polling(&self, to_poll: u8) -> u8 {
let to_poll = to_poll | self.state.load(Ordering::Acquire);
self.state.store(to_poll & !POLLING & !WOKEN, Ordering::Release);
fn end_polling(&self, mut to_poll: u8) -> u8 {
to_poll |= self.state.swap(!POLLING & !WOKEN, Ordering::SeqCst);
self.state.fetch_and(to_poll & !POLLING & !WOKEN, Ordering::SeqCst);
to_poll
}
}
Expand Down Expand Up @@ -319,7 +319,7 @@ where

let poll_state_value = this.poll_state.end_polling(need_to_poll_next);

let is_done = this.futures.is_empty() && *this.is_stream_done;
let is_done = *this.is_stream_done && this.futures.is_empty();

if !is_done && poll_state_value & WOKEN == NONE && poll_state_value & NEED_TO_POLL != NONE
&& (polling_with_two_wakers
Expand Down
65 changes: 44 additions & 21 deletions futures/tests/stream.rs
Expand Up @@ -87,19 +87,19 @@ fn flatten_unordered() {
let sleep_time = Duration::from_millis(*self.data.last().unwrap_or(&0) as u64);
thread::spawn(move || {
thread::sleep(sleep_time);
woken.swap(true, Ordering::Relaxed);
woken.swap(true, Ordering::SeqCst);
waker.wake_by_ref();
});
} else {
self.woken.swap(true, Ordering::Relaxed);
self.woken.swap(true, Ordering::SeqCst);
ctx.waker().wake_by_ref();
}
self.polled = true;
Poll::Pending
} else {
assert!(
self.woken.swap(false, Ordering::AcqRel),
"Inner stream polled before wake!"
self.woken.swap(false, Ordering::SeqCst),
format!("Inner stream polled before wake! {:?}", self.data.last())
);
self.polled = false;
Poll::Ready(self.data.pop())
Expand All @@ -126,27 +126,25 @@ fn flatten_unordered() {
let sleep_time = Duration::from_millis(self.base as u64);
thread::spawn(move || {
thread::sleep(sleep_time);
woken.swap(true, Ordering::Relaxed);
woken.swap(true, Ordering::SeqCst);
waker.wake_by_ref();
});
} else {
self.woken.swap(true, Ordering::Relaxed);
self.woken.swap(true, Ordering::SeqCst);
ctx.waker().wake_by_ref();
}
Poll::Pending
} else {
assert!(
self.woken.swap(false, Ordering::AcqRel),
"Stream polled before wake!"
self.woken.swap(false, Ordering::SeqCst),
format!("Stream polled before wake! {}", self.base)
);
let data: Vec<_> = (0..6).into_iter().map(|v| v + self.base * 6).collect();
self.base += 1;
self.polled = false;
Poll::Ready(Some(DataStream {
polled: false,
data: vec![9, 8, 7, 6, 5]
.into_iter()
.map(|v| v * self.base)
.collect(),
data,
wake_immediately: self.wake_immediately && self.base % 2 == 0,
woken: Arc::new(AtomicBool::new(false)),
}))
Expand All @@ -156,9 +154,28 @@ fn flatten_unordered() {

// concurrent tests
block_on(async {
let fm_unordered = Interchanger {
let mut fl_unordered = Interchanger {
polled: false,
base: 1,
base: 0,
woken: Arc::new(AtomicBool::new(false)),
wake_immediately: false,
}
.take(10)
.map(|s| s.map(identity))
.flatten()
.collect::<Vec<_>>()
.await;

fl_unordered.sort();

assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>());
});

// concurrent tests
block_on(async {
let mut fm_unordered = Interchanger {
polled: false,
base: 0,
woken: Arc::new(AtomicBool::new(false)),
wake_immediately: false,
}
Expand All @@ -167,7 +184,9 @@ fn flatten_unordered() {
.collect::<Vec<_>>()
.await;

assert_eq!(fm_unordered.len(), 50);
fm_unordered.sort();

assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
});

// basic behaviour
Expand Down Expand Up @@ -209,9 +228,9 @@ fn flatten_unordered() {

// wake up immmediately
block_on(async {
let fl_unordered = Interchanger {
let mut fl_unordered = Interchanger {
polled: false,
base: 1,
base: 0,
woken: Arc::new(AtomicBool::new(false)),
wake_immediately: true,
}
Expand All @@ -221,14 +240,16 @@ fn flatten_unordered() {
.collect::<Vec<_>>()
.await;

assert_eq!(fl_unordered.len(), 50);
fl_unordered.sort();

assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>());
});

// wake up immmediately
block_on(async {
let fm_unordered = Interchanger {
let mut fm_unordered = Interchanger {
polled: false,
base: 1,
base: 0,
woken: Arc::new(AtomicBool::new(false)),
wake_immediately: true,
}
Expand All @@ -237,7 +258,9 @@ fn flatten_unordered() {
.collect::<Vec<_>>()
.await;

assert_eq!(fm_unordered.len(), 50);
fm_unordered.sort();

assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
});
}

Expand Down

0 comments on commit 2ca3e96

Please sign in to comment.