Skip to content

Commit

Permalink
Backport to 0.1: Avoid starvation from FuturesUnordered::poll_next
Browse files Browse the repository at this point in the history
This backports #2049 to the
0.1 branch. Without this change, polling > 200 futures trough a
FuturesUnordered on a Tokio 0.2 executor results in a busy loop in
Tokio's cooperative scheduling module.

See for a repro of where this breaks:
tokio-rs/tokio#2390

Tested by running the reproducer I submitted there. Without this change,
it hangs forever (spinning on CPU). With the change, it doesn't.
  • Loading branch information
krallin authored and cramertj committed Apr 22, 2020
1 parent a00d35e commit b85f56d
Showing 1 changed file with 30 additions and 0 deletions.
30 changes: 30 additions & 0 deletions src/stream/futures_unordered.rs
Expand Up @@ -15,6 +15,23 @@ use {task, Stream, Future, Poll, Async};
use executor::{Notify, UnsafeNotify, NotifyHandle};
use task_impl::{self, AtomicTask};

/// Constant used for a `FuturesUnordered` to determine how many times it is
/// allowed to poll underlying futures without yielding.
///
/// A single call to `poll_next` may potentially do a lot of work before
/// yielding. This happens in particular if the underlying futures are awoken
/// frequently but continue to return `Pending`. This is problematic if other
/// tasks are waiting on the executor, since they do not get to run. This value
/// caps the number of calls to `poll` on underlying futures a single call to
/// `poll_next` is allowed to make.
///
/// The value itself is chosen somewhat arbitrarily. It needs to be high enough
/// that amortize wakeup and scheduling costs, but low enough that we do not
/// starve other tasks for long.
///
/// See also https://github.com/rust-lang/futures-rs/issues/2047.
const YIELD_EVERY: usize = 32;

/// An unbounded set of futures.
///
/// This "combinator" also serves a special function in this library, providing
Expand Down Expand Up @@ -274,6 +291,10 @@ impl<T> Stream for FuturesUnordered<T>
type Error = T::Error;

fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
// Keep track of how many child futures we have polled,
// in case we want to forcibly yield.
let mut polled = 0;

// Ensure `parent` is correctly set.
self.inner.parent.register();

Expand Down Expand Up @@ -369,12 +390,21 @@ impl<T> Stream for FuturesUnordered<T>
future.poll()
})
};
polled += 1;

let ret = match res {
Ok(Async::NotReady) => {
let node = bomb.node.take().unwrap();
*node.future.get() = Some(future);
bomb.queue.link(node);

if polled == YIELD_EVERY {
// We have polled a large number of futures in a row without yielding.
// To ensure we do not starve other tasks waiting on the executor,
// we yield here, but immediately wake ourselves up to continue.
task_impl::current().notify();
return Ok(Async::NotReady);
}
continue
}
Ok(Async::Ready(e)) => Ok(Async::Ready(Some(e))),
Expand Down

0 comments on commit b85f56d

Please sign in to comment.