diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index e37d8722321..af255dae9e7 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -12,7 +12,7 @@ use std::cell::RefCell; use std::collections::VecDeque; use std::fmt; use std::future::Future; -use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; +use std::sync::atomic::Ordering::{AcqRel, Release}; use std::sync::Arc; use std::task::Poll::{Pending, Ready}; use std::time::Duration; @@ -219,13 +219,11 @@ impl Inner

{ let _enter = crate::runtime::enter(false); let waker = scheduler.spawner.waker_ref(); let mut cx = std::task::Context::from_waker(&waker); - let mut polled = false; pin!(future); 'outer: loop { - if scheduler.spawner.was_woken() || !polled { - polled = true; + if scheduler.spawner.reset_woken() { scheduler.stats.incr_poll_count(); if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) { return v; @@ -418,13 +416,15 @@ impl Spawner { } fn waker_ref(&self) -> WakerRef<'_> { - // clear the woken bit - self.shared.woken.swap(false, AcqRel); + // Set woken to true when enter block_on, ensure outer future + // be polled for the first time when enter loop + self.shared.woken.store(true, Release); waker_ref(&self.shared) } - fn was_woken(&self) -> bool { - self.shared.woken.load(Acquire) + // reset woken to false and return original value + pub(crate) fn reset_woken(&self) -> bool { + self.shared.woken.swap(false, AcqRel) } } diff --git a/tokio/src/runtime/tests/loom_basic_scheduler.rs b/tokio/src/runtime/tests/loom_basic_scheduler.rs index e6221d3b179..d2894b9b27e 100644 --- a/tokio/src/runtime/tests/loom_basic_scheduler.rs +++ b/tokio/src/runtime/tests/loom_basic_scheduler.rs @@ -63,6 +63,45 @@ fn block_on_num_polls() { }); } +#[test] +fn assert_no_unnecessary_polls() { + loom::model(|| { + // // After we poll outer future, woken should reset to false + let rt = Builder::new_current_thread().build().unwrap(); + let (tx, rx) = oneshot::channel(); + let pending_cnt = Arc::new(AtomicUsize::new(0)); + + rt.spawn(async move { + for _ in 0..24 { + task::yield_now().await; + } + tx.send(()).unwrap(); + }); + + let pending_cnt_clone = pending_cnt.clone(); + rt.block_on(async move { + // use task::yield_now() to ensure woken set to true + // ResetFuture will be polled at most once + // Here comes two cases + // 1. recv no message from channel, ResetFuture will be polled + // but get Pending and we record ResetFuture.pending_cnt ++. + // Then when message arrive, ResetFuture returns Ready. So we + // expect ResetFuture.pending_cnt = 1 + // 2. recv message from channel, ResetFuture returns Ready immediately. + // We expect ResetFuture.pending_cnt = 0 + task::yield_now().await; + ResetFuture { + rx, + pending_cnt: pending_cnt_clone, + } + .await; + }); + + let pending_cnt = pending_cnt.load(Acquire); + assert!(pending_cnt <= 1); + }); +} + struct BlockedFuture { rx: Receiver<()>, num_polls: Arc, @@ -80,3 +119,22 @@ impl Future for BlockedFuture { } } } + +struct ResetFuture { + rx: Receiver<()>, + pending_cnt: Arc, +} + +impl Future for ResetFuture { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match Pin::new(&mut self.rx).poll(cx) { + Poll::Pending => { + self.pending_cnt.fetch_add(1, Release); + Poll::Pending + } + _ => Poll::Ready(()), + } + } +}