From f2b2f32cfbc46d5d6f7c844f8f1e258110e974ec Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Sat, 20 Feb 2021 16:39:24 +0900 Subject: [PATCH] FuturesUnordered: Do not poll the same future twice per iteration (#2333) --- .../src/stream/futures_unordered/mod.rs | 34 +++++++------- futures/tests/futures_unordered.rs | 44 ++++++++++++++++++- 2 files changed, 60 insertions(+), 18 deletions(-) diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index 37b7d7e0ec..8dcc551e9f 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -30,22 +30,6 @@ use self::task::Task; mod ready_to_run_queue; use self::ready_to_run_queue::{ReadyToRunQueue, Dequeue}; -/// Constant used for a `FuturesUnordered` to determine how many times it is -/// allowed to poll underlying futures without yielding. -/// -/// A single call to `poll_next` may potentially do a lot of work before -/// yielding. This happens in particular if the underlying futures are awoken -/// frequently but continue to return `Pending`. This is problematic if other -/// tasks are waiting on the executor, since they do not get to run. This value -/// caps the number of calls to `poll` on underlying futures a single call to -/// `poll_next` is allowed to make. -/// -/// The value itself is chosen somewhat arbitrarily. It needs to be high enough -/// that amortize wakeup and scheduling costs, but low enough that we do not -/// starve other tasks for long. -/// -/// See also https://github.com/rust-lang/futures-rs/issues/2047. -const YIELD_EVERY: usize = 32; /// A set of futures which may complete in any order. /// @@ -414,6 +398,22 @@ impl Stream for FuturesUnordered { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // Variable to determine how many times it is allowed to poll underlying + // futures without yielding. + // + // A single call to `poll_next` may potentially do a lot of work before + // yielding. This happens in particular if the underlying futures are awoken + // frequently but continue to return `Pending`. This is problematic if other + // tasks are waiting on the executor, since they do not get to run. This value + // caps the number of calls to `poll` on underlying futures a single call to + // `poll_next` is allowed to make. + // + // The value is the length of FuturesUnordered. This ensures that each + // future is polled only once at most per iteration. + // + // See also https://github.com/rust-lang/futures-rs/issues/2047. + let yield_every = self.len(); + // Keep track of how many child futures we have polled, // in case we want to forcibly yield. let mut polled = 0; @@ -548,7 +548,7 @@ impl Stream for FuturesUnordered { let task = bomb.task.take().unwrap(); bomb.queue.link(task); - if polled == YIELD_EVERY { + if polled == yield_every { // We have polled a large number of futures in a row without yielding. // To ensure we do not starve other tasks waiting on the executor, // we yield here, but immediately wake ourselves up to continue. diff --git a/futures/tests/futures_unordered.rs b/futures/tests/futures_unordered.rs index 076afb601b..dea382881d 100644 --- a/futures/tests/futures_unordered.rs +++ b/futures/tests/futures_unordered.rs @@ -1,3 +1,10 @@ +use futures::future::Future; +use futures::stream::{FuturesUnordered, StreamExt}; +use futures::task::{Context, Poll}; +use futures_test::task::noop_context; +use std::iter::FromIterator; +use std::pin::Pin; + #[test] fn is_terminated() { use futures::future; @@ -251,12 +258,13 @@ fn futures_not_moved_after_poll() { use futures::future; use futures::stream::FuturesUnordered; use futures_test::future::FutureTestExt; - use futures_test::{assert_stream_done, assert_stream_next}; + use futures_test::{assert_stream_done, assert_stream_next, assert_stream_pending}; // Future that will be ready after being polled twice, // asserting that it does not move. let fut = future::ready(()).pending_once().assert_unmoved(); let mut stream = vec![fut; 3].into_iter().collect::>(); + assert_stream_pending!(stream); assert_stream_next!(stream, ()); assert_stream_next!(stream, ()); assert_stream_next!(stream, ()); @@ -307,3 +315,37 @@ fn len_valid_during_out_of_order_completion() { assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(7)))); assert_eq!(stream.len(), 0); } + +#[test] +fn polled_only_once_at_most_per_iteration() { + #[derive(Debug, Clone, Copy, Default)] + struct F { + polled: bool, + } + + impl Future for F { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, _: &mut Context) -> Poll { + if self.polled { + panic!("polled twice") + } else { + self.polled = true; + Poll::Pending + } + } + } + + let cx = &mut noop_context(); + + let mut tasks = FuturesUnordered::from_iter(vec![F::default(); 10]); + assert!(tasks.poll_next_unpin(cx).is_pending()); + assert_eq!(10, tasks.iter().filter(|f| f.polled).count()); + + let mut tasks = FuturesUnordered::from_iter(vec![F::default(); 33]); + assert!(tasks.poll_next_unpin(cx).is_pending()); + assert_eq!(33, tasks.iter().filter(|f| f.polled).count()); + + let mut tasks = FuturesUnordered::::new(); + assert_eq!(Poll::Ready(None), tasks.poll_next_unpin(cx)); +}