Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid starvation from FuturesUnordered::poll_next #2049

Merged
merged 1 commit into from Jan 27, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 30 additions & 0 deletions futures-util/src/stream/futures_unordered/mod.rs
Expand Up @@ -39,6 +39,23 @@ use self::ready_to_run_queue::{ReadyToRunQueue, Dequeue};
/// without running out of ram.
const TERMINATED_SENTINEL_LENGTH: usize = usize::max_value();

/// Constant used for a `FuturesUnordered` to determine how many times it is
/// allowed to poll underlying futures without yielding.
///
/// A single call to `poll_next` may potentially do a lot of work before
/// yielding. This happens in particular if the underlying futures are awoken
/// frequently but continue to return `Pending`. This is problematic if other
/// tasks are waiting on the executor, since they do not get to run. This value
/// caps the number of calls to `poll` on underlying futures a single call to
/// `poll_next` is allowed to make.
///
/// The value itself is chosen somewhat arbitrarily. It needs to be high enough
/// that amortize wakeup and scheduling costs, but low enough that we do not
/// starve other tasks for long.
///
/// See also https://github.com/rust-lang/futures-rs/issues/2047.
const YIELD_EVERY: usize = 32;

/// A set of futures which may complete in any order.
///
/// This structure is optimized to manage a large number of futures.
Expand Down Expand Up @@ -313,6 +330,10 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Self::Item>>
{
// Keep track of how many child futures we have polled,
// in case we want to forcibly yield.
let mut polled = 0;

// Ensure `parent` is correctly set.
self.ready_to_run_queue.waker.register(cx.waker());

Expand Down Expand Up @@ -433,11 +454,20 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {

future.poll(&mut cx)
};
polled += 1;

match res {
Poll::Pending => {
let task = bomb.task.take().unwrap();
bomb.queue.link(task);

if polled == YIELD_EVERY {
// We have polled a large number of futures in a row without yielding.
// To ensure we do not starve other tasks waiting on the executor,
// we yield here, but immediately wake ourselves up to continue.
cx.waker().wake_by_ref();
return Poll::Pending;
}
continue
}
Poll::Ready(output) => {
Expand Down