diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index 6bc930f074b..af255dae9e7 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -219,13 +219,11 @@ impl Inner

{ let _enter = crate::runtime::enter(false); let waker = scheduler.spawner.waker_ref(); let mut cx = std::task::Context::from_waker(&waker); - let mut polled = false; pin!(future); 'outer: loop { - if scheduler.spawner.reset_woken() || !polled { - polled = true; + if scheduler.spawner.reset_woken() { scheduler.stats.incr_poll_count(); if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) { return v; @@ -418,6 +416,9 @@ impl Spawner { } fn waker_ref(&self) -> WakerRef<'_> { + // Set woken to true when enter block_on, ensure outer future + // be polled for the first time when enter loop + self.shared.woken.store(true, Release); waker_ref(&self.shared) } diff --git a/tokio/src/runtime/tests/loom_basic_scheduler.rs b/tokio/src/runtime/tests/loom_basic_scheduler.rs index e6221d3b179..447fc97974c 100644 --- a/tokio/src/runtime/tests/loom_basic_scheduler.rs +++ b/tokio/src/runtime/tests/loom_basic_scheduler.rs @@ -63,6 +63,45 @@ fn block_on_num_polls() { }); } +#[test] +fn assert_no_unnecessary_polls() { + loom::model(|| { + // // After we poll outer future, woken should reset to false + let rt = Builder::new_current_thread().build().unwrap(); + let (tx, rx) = oneshot::channel(); + let pending_cnt = Arc::new(AtomicUsize::new(0)); + + rt.spawn(async move { + for _ in 0..24 { + task::yield_now().await; + } + tx.send(()).unwrap(); + }); + + let pending_cnt_clone = pending_cnt.clone(); + rt.block_on(async move { + // use task::yield_now() to ensure woken set to true + // ResetFuture will be polled at most once + // Here comes two cases + // 1. recv no message from channel, ResetFuture will be polled + // but get Pending and we record ResetFuture.pending_cnt ++. + // Then when message arrive, ResetFuture returns Ready. So we + // expect ResetFuture.pending_cnt = 1 + // 2. recv message from channel, ResetFuture returns Ready immediately. + // We expect ResetFuture.pending_cnt = 0 + task::yield_now().await; + ResetFuture { + rx, + pending_cnt: pending_cnt_clone, + } + .await; + }); + + let pending_cnt = pending_cnt.load(Acquire); + assert!(pending_cnt <= 1); + }); +} + struct BlockedFuture { rx: Receiver<()>, num_polls: Arc, @@ -80,3 +119,22 @@ impl Future for BlockedFuture { } } } + +struct ResetFuture { + rx: Receiver<()>, + pending_cnt: Arc, +} + +impl Future for ResetFuture { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match Pin::new(&mut self.rx).poll(cx) { + Poll::Pending => { + self.pending_cnt.fetch_add(1, Release); + Poll::Pending + } + _ => Poll::Ready(()), + } + } +}