Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FuturesUnordered: Do not poll the same future twice per iteration #2333

Merged
merged 2 commits into from Feb 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 17 additions & 17 deletions futures-util/src/stream/futures_unordered/mod.rs
Expand Up @@ -30,22 +30,6 @@ use self::task::Task;
mod ready_to_run_queue;
use self::ready_to_run_queue::{ReadyToRunQueue, Dequeue};

/// Constant used for a `FuturesUnordered` to determine how many times it is
/// allowed to poll underlying futures without yielding.
///
/// A single call to `poll_next` may potentially do a lot of work before
/// yielding. This happens in particular if the underlying futures are awoken
/// frequently but continue to return `Pending`. This is problematic if other
/// tasks are waiting on the executor, since they do not get to run. This value
/// caps the number of calls to `poll` on underlying futures a single call to
/// `poll_next` is allowed to make.
///
/// The value itself is chosen somewhat arbitrarily. It needs to be high enough
/// that amortize wakeup and scheduling costs, but low enough that we do not
/// starve other tasks for long.
///
/// See also https://github.com/rust-lang/futures-rs/issues/2047.
const YIELD_EVERY: usize = 32;

/// A set of futures which may complete in any order.
///
Expand Down Expand Up @@ -414,6 +398,22 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Self::Item>>
{
// Variable to determine how many times it is allowed to poll underlying
// futures without yielding.
//
// A single call to `poll_next` may potentially do a lot of work before
// yielding. This happens in particular if the underlying futures are awoken
// frequently but continue to return `Pending`. This is problematic if other
// tasks are waiting on the executor, since they do not get to run. This value
// caps the number of calls to `poll` on underlying futures a single call to
// `poll_next` is allowed to make.
//
// The value is the length of FuturesUnordered. This ensures that each
// future is polled only once at most per iteration.
//
// See also https://github.com/rust-lang/futures-rs/issues/2047.
let yield_every = self.len();

// Keep track of how many child futures we have polled,
// in case we want to forcibly yield.
let mut polled = 0;
Expand Down Expand Up @@ -548,7 +548,7 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
let task = bomb.task.take().unwrap();
bomb.queue.link(task);

if polled == YIELD_EVERY {
if polled == yield_every {
// We have polled a large number of futures in a row without yielding.
// To ensure we do not starve other tasks waiting on the executor,
// we yield here, but immediately wake ourselves up to continue.
Expand Down
44 changes: 43 additions & 1 deletion futures/tests/futures_unordered.rs
@@ -1,3 +1,10 @@
use futures::future::Future;
use futures::stream::{FuturesUnordered, StreamExt};
use futures::task::{Context, Poll};
use futures_test::task::noop_context;
use std::iter::FromIterator;
use std::pin::Pin;

#[test]
fn is_terminated() {
use futures::future;
Expand Down Expand Up @@ -270,12 +277,13 @@ fn futures_not_moved_after_poll() {
use futures::future;
use futures::stream::FuturesUnordered;
use futures_test::future::FutureTestExt;
use futures_test::{assert_stream_done, assert_stream_next};
use futures_test::{assert_stream_done, assert_stream_next, assert_stream_pending};

// Future that will be ready after being polled twice,
// asserting that it does not move.
let fut = future::ready(()).pending_once().assert_unmoved();
let mut stream = vec![fut; 3].into_iter().collect::<FuturesUnordered<_>>();
assert_stream_pending!(stream);
assert_stream_next!(stream, ());
assert_stream_next!(stream, ());
assert_stream_next!(stream, ());
Expand Down Expand Up @@ -326,3 +334,37 @@ fn len_valid_during_out_of_order_completion() {
assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(7))));
assert_eq!(stream.len(), 0);
}

#[test]
fn polled_only_once_at_most_per_iteration() {
#[derive(Debug, Clone, Copy, Default)]
struct F {
polled: bool,
}

impl Future for F {
type Output = ();

fn poll(mut self: Pin<&mut Self>, _: &mut Context) -> Poll<Self::Output> {
if self.polled {
panic!("polled twice")
} else {
self.polled = true;
Poll::Pending
}
}
}

let cx = &mut noop_context();

let mut tasks = FuturesUnordered::from_iter(vec![F::default(); 10]);
assert!(tasks.poll_next_unpin(cx).is_pending());
assert_eq!(10, tasks.iter().filter(|f| f.polled).count());

let mut tasks = FuturesUnordered::from_iter(vec![F::default(); 33]);
assert!(tasks.poll_next_unpin(cx).is_pending());
assert_eq!(33, tasks.iter().filter(|f| f.polled).count());

let mut tasks = FuturesUnordered::<F>::new();
assert_eq!(Poll::Ready(None), tasks.poll_next_unpin(cx));
}