diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index 8dc710cd34..bd7fa94f03 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -39,6 +39,23 @@ use self::ready_to_run_queue::{ReadyToRunQueue, Dequeue}; /// without running out of ram. const TERMINATED_SENTINEL_LENGTH: usize = usize::max_value(); +/// Constant used for a `FuturesUnordered` to determine how many times it is +/// allowed to poll underlying futures without yielding. +/// +/// A single call to `poll_next` may potentially do a lot of work before +/// yielding. This happens in particular if the underlying futures are awoken +/// frequently but continue to return `Pending`. This is problematic if other +/// tasks are waiting on the executor, since they do not get to run. This value +/// caps the number of calls to `poll` on underlying futures a single call to +/// `poll_next` is allowed to make. +/// +/// The value itself is chosen somewhat arbitrarily. It needs to be high enough +/// that amortize wakeup and scheduling costs, but low enough that we do not +/// starve other tasks for long. +/// +/// See also https://github.com/rust-lang/futures-rs/issues/2047. +const YIELD_EVERY: usize = 32; + /// A set of futures which may complete in any order. /// /// This structure is optimized to manage a large number of futures. @@ -313,6 +330,10 @@ impl Stream for FuturesUnordered { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // Keep track of how many child futures we have polled, + // in case we want to forcibly yield. + let mut polled = 0; + // Ensure `parent` is correctly set. self.ready_to_run_queue.waker.register(cx.waker()); @@ -433,11 +454,20 @@ impl Stream for FuturesUnordered { future.poll(&mut cx) }; + polled += 1; match res { Poll::Pending => { let task = bomb.task.take().unwrap(); bomb.queue.link(task); + + if polled == YIELD_EVERY { + // We have polled a large number of futures in a row without yielding. + // To ensure we do not starve other tasks waiting on the executor, + // we yield here, but immediately wake ourselves up to continue. + cx.waker().wake_by_ref(); + return Poll::Pending; + } continue } Poll::Ready(output) => {