diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index 15b932afc4..fdbd53de8e 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -390,6 +390,7 @@ impl Stream for FuturesUnordered { // Keep track of how many child futures we have polled, // in case we want to forcibly yield. let mut polled = 0; + let mut yielded = 0; // Ensure `parent` is correctly set. self.ready_to_run_queue.waker.register(cx.waker()); @@ -519,7 +520,7 @@ impl Stream for FuturesUnordered { let task = bomb.task.take().unwrap(); // If the future was awoken during polling, we assume // the future wanted to explicitly yield. - let yielded = task.woken.load(Relaxed); + yielded += task.woken.load(Relaxed) as usize; bomb.queue.link(task); // If a future yields, we respect it and yield here. @@ -527,7 +528,7 @@ impl Stream for FuturesUnordered { // avoid starving other tasks waiting on the executor. // (polling the same future twice per iteration may cause // the problem: https://github.com/rust-lang/futures-rs/pull/2333) - if yielded || polled == len { + if yielded >= 2 || polled == len { cx.waker().wake_by_ref(); return Poll::Pending; } diff --git a/futures/tests/stream_futures_unordered.rs b/futures/tests/stream_futures_unordered.rs index 26954c6a05..8ddd62f4a8 100644 --- a/futures/tests/stream_futures_unordered.rs +++ b/futures/tests/stream_futures_unordered.rs @@ -268,8 +268,6 @@ fn futures_not_moved_after_poll() { let fut = future::ready(()).pending_once().assert_unmoved(); let mut stream = vec![fut; 3].into_iter().collect::>(); assert_stream_pending!(stream); - assert_stream_pending!(stream); - assert_stream_pending!(stream); assert_stream_next!(stream, ()); assert_stream_next!(stream, ()); assert_stream_next!(stream, ());