Skip to content

Commit

Permalink
Avoid starvation from FuturesUnordered::poll_next
Browse files Browse the repository at this point in the history
  • Loading branch information
jonhoo authored and cramertj committed Jan 27, 2020
1 parent 3f0e90c commit e767803
Showing 1 changed file with 30 additions and 0 deletions.
30 changes: 30 additions & 0 deletions futures-util/src/stream/futures_unordered/mod.rs
Expand Up @@ -39,6 +39,23 @@ use self::ready_to_run_queue::{ReadyToRunQueue, Dequeue};
/// without running out of ram.
const TERMINATED_SENTINEL_LENGTH: usize = usize::max_value();

/// Constant used for a `FuturesUnordered` to determine how many times it is
/// allowed to poll underlying futures without yielding.
///
/// A single call to `poll_next` may potentially do a lot of work before
/// yielding. This happens in particular if the underlying futures are awoken
/// frequently but continue to return `Pending`. This is problematic if other
/// tasks are waiting on the executor, since they do not get to run. This value
/// caps the number of calls to `poll` on underlying futures a single call to
/// `poll_next` is allowed to make.
///
/// The value itself is chosen somewhat arbitrarily. It needs to be high enough
/// that amortize wakeup and scheduling costs, but low enough that we do not
/// starve other tasks for long.
///
/// See also https://github.com/rust-lang/futures-rs/issues/2047.
const YIELD_EVERY: usize = 32;

/// A set of futures which may complete in any order.
///
/// This structure is optimized to manage a large number of futures.
Expand Down Expand Up @@ -313,6 +330,10 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Self::Item>>
{
// Keep track of how many child futures we have polled,
// in case we want to forcibly yield.
let mut polled = 0;

// Ensure `parent` is correctly set.
self.ready_to_run_queue.waker.register(cx.waker());

Expand Down Expand Up @@ -433,11 +454,20 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {

future.poll(&mut cx)
};
polled += 1;

match res {
Poll::Pending => {
let task = bomb.task.take().unwrap();
bomb.queue.link(task);

if polled == YIELD_EVERY {
// We have polled a large number of futures in a row without yielding.
// To ensure we do not starve other tasks waiting on the executor,
// we yield here, but immediately wake ourselves up to continue.
cx.waker().wake_by_ref();
return Poll::Pending;
}
continue
}
Poll::Ready(output) => {
Expand Down

0 comments on commit e767803

Please sign in to comment.