Skip to content

Commit

Permalink
FuturesUnordered: Respect yielding from future (#2551)
Browse files Browse the repository at this point in the history
Co-authored-by: Jon Gjengset <jon@thesquareplanet.com>
  • Loading branch information
taiki-e and jonhoo committed Feb 6, 2022
1 parent 4f3b98e commit e62f534
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 37 deletions.
53 changes: 18 additions & 35 deletions futures-util/src/stream/futures_unordered/mod.rs
Expand Up @@ -6,7 +6,6 @@
use crate::task::AtomicWaker;
use alloc::sync::{Arc, Weak};
use core::cell::UnsafeCell;
use core::cmp;
use core::fmt::{self, Debug};
use core::iter::FromIterator;
use core::marker::PhantomData;
Expand All @@ -31,33 +30,6 @@ use self::task::Task;
mod ready_to_run_queue;
use self::ready_to_run_queue::{Dequeue, ReadyToRunQueue};

/// Constant used for a `FuturesUnordered` to determine how many times it is
/// allowed to poll underlying futures without yielding.
///
/// A single call to `poll_next` may potentially do a lot of work before
/// yielding. This happens in particular if the underlying futures are awoken
/// frequently but continue to return `Pending`. This is problematic if other
/// tasks are waiting on the executor, since they do not get to run. This value
/// caps the number of calls to `poll` on underlying futures a single call to
/// `poll_next` is allowed to make.
///
/// The value itself is chosen somewhat arbitrarily. It needs to be high enough
/// that amortize wakeup and scheduling costs, but low enough that we do not
/// starve other tasks for long.
///
/// See also https://github.com/rust-lang/futures-rs/issues/2047.
///
/// Note that using the length of the `FuturesUnordered` instead of this value
/// may cause problems if the number of futures is large.
/// See also https://github.com/rust-lang/futures-rs/pull/2527.
///
/// Additionally, polling the same future twice per iteration may cause another
/// problem. So, when using this value, it is necessary to limit the max value
/// based on the length of the `FuturesUnordered`.
/// (e.g., `cmp::min(self.len(), YIELD_EVERY)`)
/// See also https://github.com/rust-lang/futures-rs/pull/2333.
const YIELD_EVERY: usize = 32;

/// A set of futures which may complete in any order.
///
/// This structure is optimized to manage a large number of futures.
Expand Down Expand Up @@ -149,6 +121,7 @@ impl<Fut> FuturesUnordered<Fut> {
next_ready_to_run: AtomicPtr::new(ptr::null_mut()),
queued: AtomicBool::new(true),
ready_to_run_queue: Weak::new(),
woken: AtomicBool::new(false),
});
let stub_ptr = Arc::as_ptr(&stub);
let ready_to_run_queue = Arc::new(ReadyToRunQueue {
Expand Down Expand Up @@ -195,6 +168,7 @@ impl<Fut> FuturesUnordered<Fut> {
next_ready_to_run: AtomicPtr::new(ptr::null_mut()),
queued: AtomicBool::new(true),
ready_to_run_queue: Arc::downgrade(&self.ready_to_run_queue),
woken: AtomicBool::new(false),
});

// Reset the `is_terminated` flag if we've previously marked ourselves
Expand Down Expand Up @@ -411,12 +385,12 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
type Item = Fut::Output;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// See YIELD_EVERY docs for more.
let yield_every = cmp::min(self.len(), YIELD_EVERY);
let len = self.len();

// Keep track of how many child futures we have polled,
// in case we want to forcibly yield.
let mut polled = 0;
let mut yielded = 0;

// Ensure `parent` is correctly set.
self.ready_to_run_queue.waker.register(cx.waker());
Expand Down Expand Up @@ -527,7 +501,11 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
// the internal allocation, appropriately accessing fields and
// deallocating the task if need be.
let res = {
let waker = Task::waker_ref(bomb.task.as_ref().unwrap());
let task = bomb.task.as_ref().unwrap();
// We are only interested in whether the future is awoken before it
// finishes polling, so reset the flag here.
task.woken.store(false, Relaxed);
let waker = Task::waker_ref(task);
let mut cx = Context::from_waker(&waker);

// Safety: We won't move the future ever again
Expand All @@ -540,12 +518,17 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
match res {
Poll::Pending => {
let task = bomb.task.take().unwrap();
// If the future was awoken during polling, we assume
// the future wanted to explicitly yield.
yielded += task.woken.load(Relaxed) as usize;
bomb.queue.link(task);

if polled == yield_every {
// We have polled a large number of futures in a row without yielding.
// To ensure we do not starve other tasks waiting on the executor,
// we yield here, but immediately wake ourselves up to continue.
// If a future yields, we respect it and yield here.
// If all futures have been polled, we also yield here to
// avoid starving other tasks waiting on the executor.
// (polling the same future twice per iteration may cause
// the problem: https://github.com/rust-lang/futures-rs/pull/2333)
if yielded >= 2 || polled == len {
cx.waker().wake_by_ref();
return Poll::Pending;
}
Expand Down
9 changes: 8 additions & 1 deletion futures-util/src/stream/futures_unordered/task.rs
@@ -1,6 +1,6 @@
use alloc::sync::{Arc, Weak};
use core::cell::UnsafeCell;
use core::sync::atomic::Ordering::{self, SeqCst};
use core::sync::atomic::Ordering::{self, Relaxed, SeqCst};
use core::sync::atomic::{AtomicBool, AtomicPtr};

use super::abort::abort;
Expand Down Expand Up @@ -31,6 +31,11 @@ pub(super) struct Task<Fut> {

// Whether or not this task is currently in the ready to run queue
pub(super) queued: AtomicBool,

// Whether the future was awoken during polling
// It is possible for this flag to be set to true after the polling,
// but it will be ignored.
pub(super) woken: AtomicBool,
}

// `Task` can be sent across threads safely because it ensures that
Expand All @@ -48,6 +53,8 @@ impl<Fut> ArcWake for Task<Fut> {
None => return,
};

arc_self.woken.store(true, Relaxed);

// It's our job to enqueue this task it into the ready to run queue. To
// do this we set the `queued` flag, and if successful we then do the
// actual queueing operation, ensuring that we're only queued once.
Expand Down
2 changes: 1 addition & 1 deletion futures/tests/stream_futures_unordered.rs
Expand Up @@ -342,7 +342,7 @@ fn polled_only_once_at_most_per_iteration() {

let mut tasks = FuturesUnordered::from_iter(vec![F::default(); 33]);
assert!(tasks.poll_next_unpin(cx).is_pending());
assert_eq!(32, tasks.iter().filter(|f| f.polled).count());
assert_eq!(33, tasks.iter().filter(|f| f.polled).count());

let mut tasks = FuturesUnordered::<F>::new();
assert_eq!(Poll::Ready(None), tasks.poll_next_unpin(cx));
Expand Down

0 comments on commit e62f534

Please sign in to comment.