diff --git a/src/stream/futures_unordered.rs b/src/stream/futures_unordered.rs index a183d63406..d529d93fe9 100644 --- a/src/stream/futures_unordered.rs +++ b/src/stream/futures_unordered.rs @@ -15,6 +15,23 @@ use {task, Stream, Future, Poll, Async}; use executor::{Notify, UnsafeNotify, NotifyHandle}; use task_impl::{self, AtomicTask}; +/// Constant used for a `FuturesUnordered` to determine how many times it is +/// allowed to poll underlying futures without yielding. +/// +/// A single call to `poll_next` may potentially do a lot of work before +/// yielding. This happens in particular if the underlying futures are awoken +/// frequently but continue to return `Pending`. This is problematic if other +/// tasks are waiting on the executor, since they do not get to run. This value +/// caps the number of calls to `poll` on underlying futures a single call to +/// `poll_next` is allowed to make. +/// +/// The value itself is chosen somewhat arbitrarily. It needs to be high enough +/// that amortize wakeup and scheduling costs, but low enough that we do not +/// starve other tasks for long. +/// +/// See also https://github.com/rust-lang/futures-rs/issues/2047. +const YIELD_EVERY: usize = 32; + /// An unbounded set of futures. /// /// This "combinator" also serves a special function in this library, providing @@ -274,6 +291,10 @@ impl Stream for FuturesUnordered type Error = T::Error; fn poll(&mut self) -> Poll, T::Error> { + // Keep track of how many child futures we have polled, + // in case we want to forcibly yield. + let mut polled = 0; + // Ensure `parent` is correctly set. self.inner.parent.register(); @@ -369,12 +390,21 @@ impl Stream for FuturesUnordered future.poll() }) }; + polled += 1; let ret = match res { Ok(Async::NotReady) => { let node = bomb.node.take().unwrap(); *node.future.get() = Some(future); bomb.queue.link(node); + + if polled == YIELD_EVERY { + // We have polled a large number of futures in a row without yielding. + // To ensure we do not starve other tasks waiting on the executor, + // we yield here, but immediately wake ourselves up to continue. + task_impl::current().notify(); + return Ok(Async::NotReady); + } continue } Ok(Async::Ready(e)) => Ok(Async::Ready(Some(e))),