From 1324ea9550306422ff5224a373ff1a58396e5414 Mon Sep 17 00:00:00 2001 From: olegnn Date: Fri, 12 Jun 2020 13:27:22 +0300 Subject: [PATCH] Poll both in case if ther's nothing to poll (poll was invoked independently from wakers) + update benchmark --- futures-util/benches/flatten_unordered.rs | 17 ++++++++------- .../src/stream/stream/flatten_unordered.rs | 21 ++++++++++++------- 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/futures-util/benches/flatten_unordered.rs b/futures-util/benches/flatten_unordered.rs index ef6cd40717..64d5f9a4e3 100644 --- a/futures-util/benches/flatten_unordered.rs +++ b/futures-util/benches/flatten_unordered.rs @@ -12,9 +12,9 @@ use std::collections::VecDeque; use std::thread; #[bench] -fn inner_oneshots(b: &mut Bencher) { - const STREAM_COUNT: usize = 1_000; - const STREAM_ITEM_COUNT: usize = 10; +fn oneshot_streams(b: &mut Bencher) { + const STREAM_COUNT: usize = 10_000; + const STREAM_ITEM_COUNT: usize = 1; b.iter(|| { let mut txs = VecDeque::with_capacity(STREAM_COUNT); @@ -27,10 +27,10 @@ fn inner_oneshots(b: &mut Bencher) { } thread::spawn(move || { - let mut m = 1; + let mut last = 1; while let Some(tx) = txs.pop_front() { - let _ = tx.send(stream::iter(m..m + STREAM_ITEM_COUNT)); - m += STREAM_ITEM_COUNT; + let _ = tx.send(stream::iter(last..last + STREAM_ITEM_COUNT)); + last += STREAM_ITEM_COUNT; } }); @@ -52,11 +52,14 @@ fn inner_oneshots(b: &mut Bencher) { loop { match flatten.poll_next_unpin(cx) { Poll::Ready(None) => break, - Poll::Ready(Some(_)) => count += 1, + Poll::Ready(Some(_)) => { + count += 1; + } _ => {} } } assert_eq!(count, STREAM_COUNT * STREAM_ITEM_COUNT); + Poll::Ready(()) })) }); diff --git a/futures-util/src/stream/stream/flatten_unordered.rs b/futures-util/src/stream/stream/flatten_unordered.rs index 85e8fc381e..59496ac12c 100644 --- a/futures-util/src/stream/stream/flatten_unordered.rs +++ b/futures-util/src/stream/stream/flatten_unordered.rs @@ -25,7 +25,7 @@ const NEED_TO_POLL_FUTURES: u8 = 1; const NEED_TO_POLL_STREAM: u8 = 0b10; /// Indicates that it needs to poll something. -const NEED_TO_POLL: u8 = NEED_TO_POLL_FUTURES | NEED_TO_POLL_STREAM; +const NEED_TO_POLL_BOTH: u8 = NEED_TO_POLL_FUTURES | NEED_TO_POLL_STREAM; /// Indicates that current stream is polled at the moment. const POLLING: u8 = 0b100; @@ -63,7 +63,7 @@ impl SharedPollState { /// with non-`POLLING` state, and returns disjunction result. fn end_polling(&self, mut to_poll: u8) -> u8 { to_poll |= self.state.swap(!POLLING & !WOKEN, Ordering::SeqCst); - self.state.fetch_and(to_poll & !POLLING & !WOKEN, Ordering::SeqCst); + self.state.swap(to_poll & !POLLING & !WOKEN, Ordering::SeqCst); to_poll } } @@ -101,8 +101,8 @@ impl ArcWake for PollWaker { // Only call waker if stream isn't being polled because it will be called // at the end of polling if state was changed. if poll_state_value & (POLLING | WOKEN) == NONE { - self_arc.poll_state.set_or(WOKEN); if let Some(Some(inner_waker)) = unsafe { self_arc.inner_waker.get().as_ref() } { + self_arc.poll_state.set_or(WOKEN); inner_waker.wake_by_ref(); } } @@ -248,7 +248,12 @@ where let mut this = self.project(); let mut poll_state_value = this.poll_state.begin_polling(); - let mut polling_with_two_wakers = poll_state_value & NEED_TO_POLL == NEED_TO_POLL && !stream_will_be_woken; + + if poll_state_value & NEED_TO_POLL_BOTH == NONE { + poll_state_value = NEED_TO_POLL_STREAM | if this.futures.is_empty() { NONE } else { NEED_TO_POLL_FUTURES }; + } + + let mut polling_with_two_wakers = poll_state_value & NEED_TO_POLL_BOTH == NEED_TO_POLL_BOTH && !stream_will_be_woken; if poll_state_value & NEED_TO_POLL_STREAM != NONE { if !stream_will_be_woken { @@ -323,7 +328,7 @@ where let is_done = *this.is_stream_done && this.futures.is_empty(); - if !is_done && poll_state_value & WOKEN == NONE && poll_state_value & NEED_TO_POLL != NONE + if !is_done && poll_state_value & WOKEN == NONE && poll_state_value & NEED_TO_POLL_BOTH != NONE && (polling_with_two_wakers || poll_state_value & NEED_TO_POLL_FUTURES != NONE && !futures_will_be_woken || poll_state_value & NEED_TO_POLL_STREAM != NONE && !stream_will_be_woken) @@ -331,10 +336,10 @@ where ctx.waker().wake_by_ref(); } - if next_item.is_none() && !is_done { - Poll::Pending - } else { + if next_item.is_some() || is_done { Poll::Ready(next_item) + } else { + Poll::Pending } } }