Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport to 0.1: FuturesUnordered: Do not poll the same future twice per iteration #2358

Merged
merged 1 commit into from Feb 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
35 changes: 17 additions & 18 deletions src/stream/futures_unordered.rs
Expand Up @@ -15,23 +15,6 @@ use {task, Stream, Future, Poll, Async};
use executor::{Notify, UnsafeNotify, NotifyHandle};
use task_impl::{self, AtomicTask};

/// Constant used for a `FuturesUnordered` to determine how many times it is
/// allowed to poll underlying futures without yielding.
///
/// A single call to `poll_next` may potentially do a lot of work before
/// yielding. This happens in particular if the underlying futures are awoken
/// frequently but continue to return `Pending`. This is problematic if other
/// tasks are waiting on the executor, since they do not get to run. This value
/// caps the number of calls to `poll` on underlying futures a single call to
/// `poll_next` is allowed to make.
///
/// The value itself is chosen somewhat arbitrarily. It needs to be high enough
/// that amortize wakeup and scheduling costs, but low enough that we do not
/// starve other tasks for long.
///
/// See also https://github.com/rust-lang/futures-rs/issues/2047.
const YIELD_EVERY: usize = 32;

/// An unbounded set of futures.
///
/// This "combinator" also serves a special function in this library, providing
Expand Down Expand Up @@ -291,6 +274,22 @@ impl<T> Stream for FuturesUnordered<T>
type Error = T::Error;

fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
// Variable to determine how many times it is allowed to poll underlying
// futures without yielding.
//
// A single call to `poll_next` may potentially do a lot of work before
// yielding. This happens in particular if the underlying futures are awoken
// frequently but continue to return `Pending`. This is problematic if other
// tasks are waiting on the executor, since they do not get to run. This value
// caps the number of calls to `poll` on underlying futures a single call to
// `poll_next` is allowed to make.
//
// The value is the length of FuturesUnordered. This ensures that each
// future is polled only once at most per iteration.
//
// See also https://github.com/rust-lang/futures-rs/issues/2047.
let yield_every = self.len();

// Keep track of how many child futures we have polled,
// in case we want to forcibly yield.
let mut polled = 0;
Expand Down Expand Up @@ -398,7 +397,7 @@ impl<T> Stream for FuturesUnordered<T>
*node.future.get() = Some(future);
bomb.queue.link(node);

if polled == YIELD_EVERY {
if polled == yield_every {
// We have polled a large number of futures in a row without yielding.
// To ensure we do not starve other tasks waiting on the executor,
// we yield here, but immediately wake ourselves up to continue.
Expand Down
40 changes: 39 additions & 1 deletion tests/futures_unordered.rs
Expand Up @@ -5,7 +5,8 @@ extern crate futures;
use std::any::Any;

use futures::sync::oneshot;
use futures::stream::futures_unordered;
use std::iter::FromIterator;
use futures::stream::{futures_unordered, FuturesUnordered};
use futures::prelude::*;

mod support;
Expand Down Expand Up @@ -127,3 +128,40 @@ fn iter_mut_len() {
assert_eq!(iter_mut.len(), 0);
assert!(iter_mut.next().is_none());
}

#[test]
fn polled_only_once_at_most_per_iteration() {
#[derive(Debug, Clone, Copy, Default)]
struct F {
polled: bool,
}

impl Future for F {
type Item = ();
type Error = ();

fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
if self.polled {
panic!("polled twice")
} else {
self.polled = true;
Ok(Async::NotReady)
}
}
}


let tasks = FuturesUnordered::from_iter(vec![F::default(); 10]);
let mut tasks = futures::executor::spawn(tasks);
assert!(tasks.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready());
assert_eq!(10, tasks.get_mut().iter_mut().filter(|f| f.polled).count());

let tasks = FuturesUnordered::from_iter(vec![F::default(); 33]);
let mut tasks = futures::executor::spawn(tasks);
assert!(tasks.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready());
assert_eq!(33, tasks.get_mut().iter_mut().filter(|f| f.polled).count());

let tasks = FuturesUnordered::<F>::new();
let mut tasks = futures::executor::spawn(tasks);
assert!(tasks.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_ready());
}