Skip to content

Commit

Permalink
runtime: avoid unnecessary polling of block_on future (#3582)
Browse files Browse the repository at this point in the history
  • Loading branch information
zaharidichev committed Mar 16, 2021
1 parent 345b29c commit e6a9167
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 2 deletions.
24 changes: 22 additions & 2 deletions tokio/src/runtime/basic_scheduler.rs
@@ -1,4 +1,5 @@
use crate::future::poll_fn;
use crate::loom::sync::atomic::AtomicBool;
use crate::loom::sync::Mutex;
use crate::park::{Park, Unpark};
use crate::runtime::task::{self, JoinHandle, Schedule, Task};
Expand All @@ -10,6 +11,7 @@ use std::cell::RefCell;
use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
use std::sync::Arc;
use std::task::Poll::{Pending, Ready};
use std::time::Duration;
Expand Down Expand Up @@ -70,6 +72,9 @@ struct Shared {

/// Unpark the blocked thread
unpark: Box<dyn Unpark>,

// indicates whether the blocked on thread was woken
woken: AtomicBool,
}

/// Thread-local context.
Expand All @@ -85,6 +90,9 @@ struct Context {
const INITIAL_CAPACITY: usize = 64;

/// Max number of tasks to poll per tick.
#[cfg(loom)]
const MAX_TASKS_PER_TICK: usize = 4;
#[cfg(not(loom))]
const MAX_TASKS_PER_TICK: usize = 61;

/// How often to check the remote queue first.
Expand All @@ -101,6 +109,7 @@ impl<P: Park> BasicScheduler<P> {
shared: Arc::new(Shared {
queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
unpark: unpark as Box<dyn Unpark>,
woken: AtomicBool::new(false),
}),
};

Expand Down Expand Up @@ -177,12 +186,16 @@ impl<P: Park> Inner<P> {
let _enter = crate::runtime::enter(false);
let waker = scheduler.spawner.waker_ref();
let mut cx = std::task::Context::from_waker(&waker);
let mut polled = false;

pin!(future);

'outer: loop {
if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) {
return v;
if scheduler.spawner.was_woken() || !polled {
polled = true;
if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) {
return v;
}
}

for _ in 0..MAX_TASKS_PER_TICK {
Expand Down Expand Up @@ -329,8 +342,14 @@ impl Spawner {
}

fn waker_ref(&self) -> WakerRef<'_> {
// clear the woken bit
self.shared.woken.swap(false, AcqRel);
waker_ref(&self.shared)
}

fn was_woken(&self) -> bool {
self.shared.woken.load(Acquire)
}
}

impl fmt::Debug for Spawner {
Expand Down Expand Up @@ -384,6 +403,7 @@ impl Wake for Shared {

/// Wake by reference
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.woken.store(true, Release);
arc_self.unpark.unpark();
}
}
Expand Down
82 changes: 82 additions & 0 deletions tokio/src/runtime/tests/loom_basic_scheduler.rs
@@ -0,0 +1,82 @@
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Arc;
use crate::loom::thread;
use crate::runtime::{Builder, Runtime};
use crate::sync::oneshot::{self, Receiver};
use crate::task;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::Ordering::{Acquire, Release};
use std::task::{Context, Poll};

fn assert_at_most_num_polls(rt: Arc<Runtime>, at_most_polls: usize) {
let (tx, rx) = oneshot::channel();
let num_polls = Arc::new(AtomicUsize::new(0));
rt.spawn(async move {
for _ in 0..12 {
task::yield_now().await;
}
tx.send(()).unwrap();
});

rt.block_on(async {
BlockedFuture {
rx,
num_polls: num_polls.clone(),
}
.await;
});

let polls = num_polls.load(Acquire);
assert!(polls <= at_most_polls);
}

#[test]
fn block_on_num_polls() {
loom::model(|| {
// we expect at most 3 number of polls because there are
// three points at which we poll the future. At any of these
// points it can be ready:
//
// - when we fail to steal the parker and we block on a
// notification that it is available.
//
// - when we steal the parker and we schedule the future
//
// - when the future is woken up and we have ran the max
// number of tasks for the current tick or there are no
// more tasks to run.
//
let at_most = 3;

let rt1 = Arc::new(Builder::new_current_thread().build().unwrap());
let rt2 = rt1.clone();
let rt3 = rt1.clone();

let th1 = thread::spawn(move || assert_at_most_num_polls(rt1, at_most));
let th2 = thread::spawn(move || assert_at_most_num_polls(rt2, at_most));
let th3 = thread::spawn(move || assert_at_most_num_polls(rt3, at_most));

th1.join().unwrap();
th2.join().unwrap();
th3.join().unwrap();
});
}

struct BlockedFuture {
rx: Receiver<()>,
num_polls: Arc<AtomicUsize>,
}

impl Future for BlockedFuture {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.num_polls.fetch_add(1, Release);

match Pin::new(&mut self.rx).poll(cx) {
Poll::Pending => Poll::Pending,
_ => Poll::Ready(()),
}
}
}
1 change: 1 addition & 0 deletions tokio/src/runtime/tests/mod.rs
@@ -1,4 +1,5 @@
cfg_loom! {
mod loom_basic_scheduler;
mod loom_blocking;
mod loom_oneshot;
mod loom_pool;
Expand Down

0 comments on commit e6a9167

Please sign in to comment.