From b6afb23a602e5ba39dc211e14aab7fbfc9771df1 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Sun, 6 Feb 2022 13:18:22 +0900 Subject: [PATCH] tweak heuristic --- futures-util/src/stream/futures_unordered/mod.rs | 5 +++-- futures-util/src/stream/futures_unordered/task.rs | 2 +- futures/tests/stream_futures_unordered.rs | 2 -- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index 15b932afc4..fdbd53de8e 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -390,6 +390,7 @@ impl Stream for FuturesUnordered { // Keep track of how many child futures we have polled, // in case we want to forcibly yield. let mut polled = 0; + let mut yielded = 0; // Ensure `parent` is correctly set. self.ready_to_run_queue.waker.register(cx.waker()); @@ -519,7 +520,7 @@ impl Stream for FuturesUnordered { let task = bomb.task.take().unwrap(); // If the future was awoken during polling, we assume // the future wanted to explicitly yield. - let yielded = task.woken.load(Relaxed); + yielded += task.woken.load(Relaxed) as usize; bomb.queue.link(task); // If a future yields, we respect it and yield here. @@ -527,7 +528,7 @@ impl Stream for FuturesUnordered { // avoid starving other tasks waiting on the executor. // (polling the same future twice per iteration may cause // the problem: https://github.com/rust-lang/futures-rs/pull/2333) - if yielded || polled == len { + if yielded >= 2 || polled == len { cx.waker().wake_by_ref(); return Poll::Pending; } diff --git a/futures-util/src/stream/futures_unordered/task.rs b/futures-util/src/stream/futures_unordered/task.rs index 2ca6c3c8af..ec2114effa 100644 --- a/futures-util/src/stream/futures_unordered/task.rs +++ b/futures-util/src/stream/futures_unordered/task.rs @@ -32,7 +32,7 @@ pub(super) struct Task { // Whether or not this task is currently in the ready to run queue pub(super) queued: AtomicBool, - // Whether the future waken before it finishes polling + // Whether the future was awoken during polling // It is possible for this flag to be set to true after the polling, // but it will be ignored. pub(super) woken: AtomicBool, diff --git a/futures/tests/stream_futures_unordered.rs b/futures/tests/stream_futures_unordered.rs index 26954c6a05..8ddd62f4a8 100644 --- a/futures/tests/stream_futures_unordered.rs +++ b/futures/tests/stream_futures_unordered.rs @@ -268,8 +268,6 @@ fn futures_not_moved_after_poll() { let fut = future::ready(()).pending_once().assert_unmoved(); let mut stream = vec![fut; 3].into_iter().collect::>(); assert_stream_pending!(stream); - assert_stream_pending!(stream); - assert_stream_pending!(stream); assert_stream_next!(stream, ()); assert_stream_next!(stream, ()); assert_stream_next!(stream, ());