From a3cf20f10750e8eac4d7421862b94cff591af976 Mon Sep 17 00:00:00 2001 From: suika Date: Sat, 9 Oct 2021 11:39:54 +0800 Subject: [PATCH] reset woken of outer future after polled --- tokio/src/runtime/basic_scheduler.rs | 7 +- .../src/runtime/tests/loom_basic_scheduler.rs | 72 ++++++++++++++++++- 2 files changed, 73 insertions(+), 6 deletions(-) diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index 6bc930f074b..af255dae9e7 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -219,13 +219,11 @@ impl Inner

{ let _enter = crate::runtime::enter(false); let waker = scheduler.spawner.waker_ref(); let mut cx = std::task::Context::from_waker(&waker); - let mut polled = false; pin!(future); 'outer: loop { - if scheduler.spawner.reset_woken() || !polled { - polled = true; + if scheduler.spawner.reset_woken() { scheduler.stats.incr_poll_count(); if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) { return v; @@ -418,6 +416,9 @@ impl Spawner { } fn waker_ref(&self) -> WakerRef<'_> { + // Set woken to true when enter block_on, ensure outer future + // be polled for the first time when enter loop + self.shared.woken.store(true, Release); waker_ref(&self.shared) } diff --git a/tokio/src/runtime/tests/loom_basic_scheduler.rs b/tokio/src/runtime/tests/loom_basic_scheduler.rs index e6221d3b179..515b3c4c9ac 100644 --- a/tokio/src/runtime/tests/loom_basic_scheduler.rs +++ b/tokio/src/runtime/tests/loom_basic_scheduler.rs @@ -1,12 +1,12 @@ -use crate::loom::sync::atomic::AtomicUsize; +use crate::loom::sync::atomic::{AtomicUsize, AtomicBool}; use crate::loom::sync::Arc; use crate::loom::thread; use crate::runtime::{Builder, Runtime}; use crate::sync::oneshot::{self, Receiver}; use crate::task; -use std::future::Future; +use std::future::{self, Future}; use std::pin::Pin; -use std::sync::atomic::Ordering::{Acquire, Release}; +use std::sync::atomic::Ordering::{Acquire, Release, SeqCst}; use std::task::{Context, Poll}; fn assert_at_most_num_polls(rt: Arc, at_most_polls: usize) { @@ -31,6 +31,41 @@ fn assert_at_most_num_polls(rt: Arc, at_most_polls: usize) { assert!(polls <= at_most_polls); } +fn assert_no_unnecessary_poll(rt: Arc) { + let (tx, rx) = oneshot::channel(); + let unexpected_poll_cnt = Arc::new(AtomicUsize::new(0)); + let pending_cnt = Arc::new(AtomicUsize::new(0)); + + rt.spawn(async move { + for i in 0..24 { + task::yield_now().await; + } + tx.send(()).unwrap(); + }); + + let pending_cnt_clone = pending_cnt.clone(); + rt.block_on(async move { + // use future::ready(()) to ensure woken set to true + // ResetFuture will be polled at most once + // Here comes two cases + // 1. recv no message from channel, ResetFuture will be polled + // but get Pending and we record ResetFuture.pending_cnt ++= 1. + // Then when message arrive, ResetFuture returns Ready. So were + // expect ResetFuture.pending_cnt = 1 + // 2. recv message from channel, ResetFuture returns Ready immediately. + // We expect ResetFuture.pending_cnt = 0 + future::ready(()).await; + ResetFuture { + rx, + pending_cnt: pending_cnt_clone, + }.await; + }); + + + let pending_cnt = pending_cnt.load(Acquire); + assert!(pending_cnt <= 1); +} + #[test] fn block_on_num_polls() { loom::model(|| { @@ -63,6 +98,16 @@ fn block_on_num_polls() { }); } +#[test] +fn reset_woken() { + loom::model(|| { + // After we poll outer future, woken should reset to false + let rt = Arc::new(Builder::new_current_thread().build().unwrap()); + let th = thread::spawn(move || assert_no_unnecessary_poll(rt)); + th.join().unwrap(); + }); +} + struct BlockedFuture { rx: Receiver<()>, num_polls: Arc, @@ -80,3 +125,24 @@ impl Future for BlockedFuture { } } } + +struct ResetFuture { + rx: Receiver<()>, + pending_cnt: Arc, +} + +impl Future for ResetFuture { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match Pin::new(&mut self.rx).poll(cx) { + Poll::Pending => { + self.pending_cnt.fetch_add(1, Release); + Poll::Pending + }, + _ => { + Poll::Ready(()) + } + } + } +}