From cfe1d82c153daf97520ca0433c0b7e088360a552 Mon Sep 17 00:00:00 2001 From: Thomas Orozco Date: Wed, 24 Feb 2021 12:25:47 +0000 Subject: [PATCH] Backport to 0.1: FuturesUnordered: Do not poll the same future twice per iteration (#2358) Same as #2333. The same issue exists in 0.1, so backporting it there helps for code that is still using Futures 0.1 in some places. --- src/stream/futures_unordered.rs | 35 ++++++++++++++--------------- tests/futures_unordered.rs | 40 ++++++++++++++++++++++++++++++++- 2 files changed, 56 insertions(+), 19 deletions(-) diff --git a/src/stream/futures_unordered.rs b/src/stream/futures_unordered.rs index d529d93fe9..3f25c86f39 100644 --- a/src/stream/futures_unordered.rs +++ b/src/stream/futures_unordered.rs @@ -15,23 +15,6 @@ use {task, Stream, Future, Poll, Async}; use executor::{Notify, UnsafeNotify, NotifyHandle}; use task_impl::{self, AtomicTask}; -/// Constant used for a `FuturesUnordered` to determine how many times it is -/// allowed to poll underlying futures without yielding. -/// -/// A single call to `poll_next` may potentially do a lot of work before -/// yielding. This happens in particular if the underlying futures are awoken -/// frequently but continue to return `Pending`. This is problematic if other -/// tasks are waiting on the executor, since they do not get to run. This value -/// caps the number of calls to `poll` on underlying futures a single call to -/// `poll_next` is allowed to make. -/// -/// The value itself is chosen somewhat arbitrarily. It needs to be high enough -/// that amortize wakeup and scheduling costs, but low enough that we do not -/// starve other tasks for long. -/// -/// See also https://github.com/rust-lang/futures-rs/issues/2047. -const YIELD_EVERY: usize = 32; - /// An unbounded set of futures. /// /// This "combinator" also serves a special function in this library, providing @@ -291,6 +274,22 @@ impl Stream for FuturesUnordered type Error = T::Error; fn poll(&mut self) -> Poll, T::Error> { + // Variable to determine how many times it is allowed to poll underlying + // futures without yielding. + // + // A single call to `poll_next` may potentially do a lot of work before + // yielding. This happens in particular if the underlying futures are awoken + // frequently but continue to return `Pending`. This is problematic if other + // tasks are waiting on the executor, since they do not get to run. This value + // caps the number of calls to `poll` on underlying futures a single call to + // `poll_next` is allowed to make. + // + // The value is the length of FuturesUnordered. This ensures that each + // future is polled only once at most per iteration. + // + // See also https://github.com/rust-lang/futures-rs/issues/2047. + let yield_every = self.len(); + // Keep track of how many child futures we have polled, // in case we want to forcibly yield. let mut polled = 0; @@ -398,7 +397,7 @@ impl Stream for FuturesUnordered *node.future.get() = Some(future); bomb.queue.link(node); - if polled == YIELD_EVERY { + if polled == yield_every { // We have polled a large number of futures in a row without yielding. // To ensure we do not starve other tasks waiting on the executor, // we yield here, but immediately wake ourselves up to continue. diff --git a/tests/futures_unordered.rs b/tests/futures_unordered.rs index e29659f011..325a6f3e48 100644 --- a/tests/futures_unordered.rs +++ b/tests/futures_unordered.rs @@ -5,7 +5,8 @@ extern crate futures; use std::any::Any; use futures::sync::oneshot; -use futures::stream::futures_unordered; +use std::iter::FromIterator; +use futures::stream::{futures_unordered, FuturesUnordered}; use futures::prelude::*; mod support; @@ -127,3 +128,40 @@ fn iter_mut_len() { assert_eq!(iter_mut.len(), 0); assert!(iter_mut.next().is_none()); } + +#[test] +fn polled_only_once_at_most_per_iteration() { + #[derive(Debug, Clone, Copy, Default)] + struct F { + polled: bool, + } + + impl Future for F { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Result, Self::Error> { + if self.polled { + panic!("polled twice") + } else { + self.polled = true; + Ok(Async::NotReady) + } + } + } + + + let tasks = FuturesUnordered::from_iter(vec![F::default(); 10]); + let mut tasks = futures::executor::spawn(tasks); + assert!(tasks.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready()); + assert_eq!(10, tasks.get_mut().iter_mut().filter(|f| f.polled).count()); + + let tasks = FuturesUnordered::from_iter(vec![F::default(); 33]); + let mut tasks = futures::executor::spawn(tasks); + assert!(tasks.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready()); + assert_eq!(33, tasks.get_mut().iter_mut().filter(|f| f.polled).count()); + + let tasks = FuturesUnordered::::new(); + let mut tasks = futures::executor::spawn(tasks); + assert!(tasks.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_ready()); +}