From d4eebb468575290c86568faa73df6c4562c505b7 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Fri, 14 Jan 2022 01:50:56 +0900 Subject: [PATCH 1/3] FuturesUnordered: Respect yielding from future --- .../src/stream/futures_unordered/mod.rs | 52 ++++++------------- .../src/stream/futures_unordered/task.rs | 9 +++- futures/tests/stream_futures_unordered.rs | 4 +- 3 files changed, 28 insertions(+), 37 deletions(-) diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index 46a0ae5bde..d176a715a2 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -6,7 +6,6 @@ use crate::task::AtomicWaker; use alloc::sync::{Arc, Weak}; use core::cell::UnsafeCell; -use core::cmp; use core::fmt::{self, Debug}; use core::iter::FromIterator; use core::marker::PhantomData; @@ -31,33 +30,6 @@ use self::task::Task; mod ready_to_run_queue; use self::ready_to_run_queue::{Dequeue, ReadyToRunQueue}; -/// Constant used for a `FuturesUnordered` to determine how many times it is -/// allowed to poll underlying futures without yielding. -/// -/// A single call to `poll_next` may potentially do a lot of work before -/// yielding. This happens in particular if the underlying futures are awoken -/// frequently but continue to return `Pending`. This is problematic if other -/// tasks are waiting on the executor, since they do not get to run. This value -/// caps the number of calls to `poll` on underlying futures a single call to -/// `poll_next` is allowed to make. -/// -/// The value itself is chosen somewhat arbitrarily. It needs to be high enough -/// that amortize wakeup and scheduling costs, but low enough that we do not -/// starve other tasks for long. -/// -/// See also https://github.com/rust-lang/futures-rs/issues/2047. -/// -/// Note that using the length of the `FuturesUnordered` instead of this value -/// may cause problems if the number of futures is large. -/// See also https://github.com/rust-lang/futures-rs/pull/2527. -/// -/// Additionally, polling the same future twice per iteration may cause another -/// problem. So, when using this value, it is necessary to limit the max value -/// based on the length of the `FuturesUnordered`. -/// (e.g., `cmp::min(self.len(), YIELD_EVERY)`) -/// See also https://github.com/rust-lang/futures-rs/pull/2333. -const YIELD_EVERY: usize = 32; - /// A set of futures which may complete in any order. /// /// This structure is optimized to manage a large number of futures. @@ -149,6 +121,7 @@ impl FuturesUnordered { next_ready_to_run: AtomicPtr::new(ptr::null_mut()), queued: AtomicBool::new(true), ready_to_run_queue: Weak::new(), + woken: AtomicBool::new(false), }); let stub_ptr = Arc::as_ptr(&stub); let ready_to_run_queue = Arc::new(ReadyToRunQueue { @@ -195,6 +168,7 @@ impl FuturesUnordered { next_ready_to_run: AtomicPtr::new(ptr::null_mut()), queued: AtomicBool::new(true), ready_to_run_queue: Arc::downgrade(&self.ready_to_run_queue), + woken: AtomicBool::new(false), }); // Reset the `is_terminated` flag if we've previously marked ourselves @@ -411,8 +385,7 @@ impl Stream for FuturesUnordered { type Item = Fut::Output; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // See YIELD_EVERY docs for more. - let yield_every = cmp::min(self.len(), YIELD_EVERY); + let len = self.len(); // Keep track of how many child futures we have polled, // in case we want to forcibly yield. @@ -527,7 +500,11 @@ impl Stream for FuturesUnordered { // the internal allocation, appropriately accessing fields and // deallocating the task if need be. let res = { - let waker = Task::waker_ref(bomb.task.as_ref().unwrap()); + let task = bomb.task.as_ref().unwrap(); + // We are only interested in whether the future waken before it + // finishes polling, so reset the flag here. + task.woken.store(false, Relaxed); + let waker = Task::waker_ref(task); let mut cx = Context::from_waker(&waker); // Safety: We won't move the future ever again @@ -540,12 +517,17 @@ impl Stream for FuturesUnordered { match res { Poll::Pending => { let task = bomb.task.take().unwrap(); + // If the future waken before it finishes polling, we assume + // the future yields. + let yielded = task.woken.load(Relaxed); bomb.queue.link(task); - if polled == yield_every { - // We have polled a large number of futures in a row without yielding. - // To ensure we do not starve other tasks waiting on the executor, - // we yield here, but immediately wake ourselves up to continue. + // If a future yields, we respect it and yield here. + // If all futures have been polled, we also yield here to + // avoid starving other tasks waiting on the executor. + // (polling the same future twice per iteration may cause + // the problem: https://github.com/rust-lang/futures-rs/pull/2333) + if yielded || polled == len { cx.waker().wake_by_ref(); return Poll::Pending; } diff --git a/futures-util/src/stream/futures_unordered/task.rs b/futures-util/src/stream/futures_unordered/task.rs index 5216199831..2ca6c3c8af 100644 --- a/futures-util/src/stream/futures_unordered/task.rs +++ b/futures-util/src/stream/futures_unordered/task.rs @@ -1,6 +1,6 @@ use alloc::sync::{Arc, Weak}; use core::cell::UnsafeCell; -use core::sync::atomic::Ordering::{self, SeqCst}; +use core::sync::atomic::Ordering::{self, Relaxed, SeqCst}; use core::sync::atomic::{AtomicBool, AtomicPtr}; use super::abort::abort; @@ -31,6 +31,11 @@ pub(super) struct Task { // Whether or not this task is currently in the ready to run queue pub(super) queued: AtomicBool, + + // Whether the future waken before it finishes polling + // It is possible for this flag to be set to true after the polling, + // but it will be ignored. + pub(super) woken: AtomicBool, } // `Task` can be sent across threads safely because it ensures that @@ -48,6 +53,8 @@ impl ArcWake for Task { None => return, }; + arc_self.woken.store(true, Relaxed); + // It's our job to enqueue this task it into the ready to run queue. To // do this we set the `queued` flag, and if successful we then do the // actual queueing operation, ensuring that we're only queued once. diff --git a/futures/tests/stream_futures_unordered.rs b/futures/tests/stream_futures_unordered.rs index 7c86c894fa..26954c6a05 100644 --- a/futures/tests/stream_futures_unordered.rs +++ b/futures/tests/stream_futures_unordered.rs @@ -268,6 +268,8 @@ fn futures_not_moved_after_poll() { let fut = future::ready(()).pending_once().assert_unmoved(); let mut stream = vec![fut; 3].into_iter().collect::>(); assert_stream_pending!(stream); + assert_stream_pending!(stream); + assert_stream_pending!(stream); assert_stream_next!(stream, ()); assert_stream_next!(stream, ()); assert_stream_next!(stream, ()); @@ -342,7 +344,7 @@ fn polled_only_once_at_most_per_iteration() { let mut tasks = FuturesUnordered::from_iter(vec![F::default(); 33]); assert!(tasks.poll_next_unpin(cx).is_pending()); - assert_eq!(32, tasks.iter().filter(|f| f.polled).count()); + assert_eq!(33, tasks.iter().filter(|f| f.polled).count()); let mut tasks = FuturesUnordered::::new(); assert_eq!(Poll::Ready(None), tasks.poll_next_unpin(cx)); From dd86ee19be1d0bd229fa1735675e0a56a69353e7 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Sat, 22 Jan 2022 14:14:30 +0900 Subject: [PATCH 2/3] Apply suggestions from code review Co-authored-by: Jon Gjengset --- futures-util/src/stream/futures_unordered/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index d176a715a2..15b932afc4 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -501,7 +501,7 @@ impl Stream for FuturesUnordered { // deallocating the task if need be. let res = { let task = bomb.task.as_ref().unwrap(); - // We are only interested in whether the future waken before it + // We are only interested in whether the future is awoken before it // finishes polling, so reset the flag here. task.woken.store(false, Relaxed); let waker = Task::waker_ref(task); @@ -517,8 +517,8 @@ impl Stream for FuturesUnordered { match res { Poll::Pending => { let task = bomb.task.take().unwrap(); - // If the future waken before it finishes polling, we assume - // the future yields. + // If the future was awoken during polling, we assume + // the future wanted to explicitly yield. let yielded = task.woken.load(Relaxed); bomb.queue.link(task); From b6afb23a602e5ba39dc211e14aab7fbfc9771df1 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Sun, 6 Feb 2022 13:18:22 +0900 Subject: [PATCH 3/3] tweak heuristic --- futures-util/src/stream/futures_unordered/mod.rs | 5 +++-- futures-util/src/stream/futures_unordered/task.rs | 2 +- futures/tests/stream_futures_unordered.rs | 2 -- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index 15b932afc4..fdbd53de8e 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -390,6 +390,7 @@ impl Stream for FuturesUnordered { // Keep track of how many child futures we have polled, // in case we want to forcibly yield. let mut polled = 0; + let mut yielded = 0; // Ensure `parent` is correctly set. self.ready_to_run_queue.waker.register(cx.waker()); @@ -519,7 +520,7 @@ impl Stream for FuturesUnordered { let task = bomb.task.take().unwrap(); // If the future was awoken during polling, we assume // the future wanted to explicitly yield. - let yielded = task.woken.load(Relaxed); + yielded += task.woken.load(Relaxed) as usize; bomb.queue.link(task); // If a future yields, we respect it and yield here. @@ -527,7 +528,7 @@ impl Stream for FuturesUnordered { // avoid starving other tasks waiting on the executor. // (polling the same future twice per iteration may cause // the problem: https://github.com/rust-lang/futures-rs/pull/2333) - if yielded || polled == len { + if yielded >= 2 || polled == len { cx.waker().wake_by_ref(); return Poll::Pending; } diff --git a/futures-util/src/stream/futures_unordered/task.rs b/futures-util/src/stream/futures_unordered/task.rs index 2ca6c3c8af..ec2114effa 100644 --- a/futures-util/src/stream/futures_unordered/task.rs +++ b/futures-util/src/stream/futures_unordered/task.rs @@ -32,7 +32,7 @@ pub(super) struct Task { // Whether or not this task is currently in the ready to run queue pub(super) queued: AtomicBool, - // Whether the future waken before it finishes polling + // Whether the future was awoken during polling // It is possible for this flag to be set to true after the polling, // but it will be ignored. pub(super) woken: AtomicBool, diff --git a/futures/tests/stream_futures_unordered.rs b/futures/tests/stream_futures_unordered.rs index 26954c6a05..8ddd62f4a8 100644 --- a/futures/tests/stream_futures_unordered.rs +++ b/futures/tests/stream_futures_unordered.rs @@ -268,8 +268,6 @@ fn futures_not_moved_after_poll() { let fut = future::ready(()).pending_once().assert_unmoved(); let mut stream = vec![fut; 3].into_iter().collect::>(); assert_stream_pending!(stream); - assert_stream_pending!(stream); - assert_stream_pending!(stream); assert_stream_next!(stream, ()); assert_stream_next!(stream, ()); assert_stream_next!(stream, ());