From e6a9167bb7012f23640f5022bd97740d7829c4b1 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Tue, 16 Mar 2021 11:30:18 +0200 Subject: [PATCH] runtime: avoid unnecessary polling of block_on future (#3582) --- tokio/src/runtime/basic_scheduler.rs | 24 +++++- .../src/runtime/tests/loom_basic_scheduler.rs | 82 +++++++++++++++++++ tokio/src/runtime/tests/mod.rs | 1 + 3 files changed, 105 insertions(+), 2 deletions(-) create mode 100644 tokio/src/runtime/tests/loom_basic_scheduler.rs diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index aeb01504c19..b26c45d3a1d 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -1,4 +1,5 @@ use crate::future::poll_fn; +use crate::loom::sync::atomic::AtomicBool; use crate::loom::sync::Mutex; use crate::park::{Park, Unpark}; use crate::runtime::task::{self, JoinHandle, Schedule, Task}; @@ -10,6 +11,7 @@ use std::cell::RefCell; use std::collections::VecDeque; use std::fmt; use std::future::Future; +use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; use std::sync::Arc; use std::task::Poll::{Pending, Ready}; use std::time::Duration; @@ -70,6 +72,9 @@ struct Shared { /// Unpark the blocked thread unpark: Box, + + // indicates whether the blocked on thread was woken + woken: AtomicBool, } /// Thread-local context. @@ -85,6 +90,9 @@ struct Context { const INITIAL_CAPACITY: usize = 64; /// Max number of tasks to poll per tick. +#[cfg(loom)] +const MAX_TASKS_PER_TICK: usize = 4; +#[cfg(not(loom))] const MAX_TASKS_PER_TICK: usize = 61; /// How often to check the remote queue first. @@ -101,6 +109,7 @@ impl BasicScheduler

{ shared: Arc::new(Shared { queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)), unpark: unpark as Box, + woken: AtomicBool::new(false), }), }; @@ -177,12 +186,16 @@ impl Inner

{ let _enter = crate::runtime::enter(false); let waker = scheduler.spawner.waker_ref(); let mut cx = std::task::Context::from_waker(&waker); + let mut polled = false; pin!(future); 'outer: loop { - if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) { - return v; + if scheduler.spawner.was_woken() || !polled { + polled = true; + if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) { + return v; + } } for _ in 0..MAX_TASKS_PER_TICK { @@ -329,8 +342,14 @@ impl Spawner { } fn waker_ref(&self) -> WakerRef<'_> { + // clear the woken bit + self.shared.woken.swap(false, AcqRel); waker_ref(&self.shared) } + + fn was_woken(&self) -> bool { + self.shared.woken.load(Acquire) + } } impl fmt::Debug for Spawner { @@ -384,6 +403,7 @@ impl Wake for Shared { /// Wake by reference fn wake_by_ref(arc_self: &Arc) { + arc_self.woken.store(true, Release); arc_self.unpark.unpark(); } } diff --git a/tokio/src/runtime/tests/loom_basic_scheduler.rs b/tokio/src/runtime/tests/loom_basic_scheduler.rs new file mode 100644 index 00000000000..e6221d3b179 --- /dev/null +++ b/tokio/src/runtime/tests/loom_basic_scheduler.rs @@ -0,0 +1,82 @@ +use crate::loom::sync::atomic::AtomicUsize; +use crate::loom::sync::Arc; +use crate::loom::thread; +use crate::runtime::{Builder, Runtime}; +use crate::sync::oneshot::{self, Receiver}; +use crate::task; +use std::future::Future; +use std::pin::Pin; +use std::sync::atomic::Ordering::{Acquire, Release}; +use std::task::{Context, Poll}; + +fn assert_at_most_num_polls(rt: Arc, at_most_polls: usize) { + let (tx, rx) = oneshot::channel(); + let num_polls = Arc::new(AtomicUsize::new(0)); + rt.spawn(async move { + for _ in 0..12 { + task::yield_now().await; + } + tx.send(()).unwrap(); + }); + + rt.block_on(async { + BlockedFuture { + rx, + num_polls: num_polls.clone(), + } + .await; + }); + + let polls = num_polls.load(Acquire); + assert!(polls <= at_most_polls); +} + +#[test] +fn block_on_num_polls() { + loom::model(|| { + // we expect at most 3 number of polls because there are + // three points at which we poll the future. At any of these + // points it can be ready: + // + // - when we fail to steal the parker and we block on a + // notification that it is available. + // + // - when we steal the parker and we schedule the future + // + // - when the future is woken up and we have ran the max + // number of tasks for the current tick or there are no + // more tasks to run. + // + let at_most = 3; + + let rt1 = Arc::new(Builder::new_current_thread().build().unwrap()); + let rt2 = rt1.clone(); + let rt3 = rt1.clone(); + + let th1 = thread::spawn(move || assert_at_most_num_polls(rt1, at_most)); + let th2 = thread::spawn(move || assert_at_most_num_polls(rt2, at_most)); + let th3 = thread::spawn(move || assert_at_most_num_polls(rt3, at_most)); + + th1.join().unwrap(); + th2.join().unwrap(); + th3.join().unwrap(); + }); +} + +struct BlockedFuture { + rx: Receiver<()>, + num_polls: Arc, +} + +impl Future for BlockedFuture { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.num_polls.fetch_add(1, Release); + + match Pin::new(&mut self.rx).poll(cx) { + Poll::Pending => Poll::Pending, + _ => Poll::Ready(()), + } + } +} diff --git a/tokio/src/runtime/tests/mod.rs b/tokio/src/runtime/tests/mod.rs index 123a7e35a37..ebb48de5290 100644 --- a/tokio/src/runtime/tests/mod.rs +++ b/tokio/src/runtime/tests/mod.rs @@ -1,4 +1,5 @@ cfg_loom! { + mod loom_basic_scheduler; mod loom_blocking; mod loom_oneshot; mod loom_pool;