Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runtime: avoid unnecessary polling of block_on future #3582

Merged
merged 7 commits into from
Mar 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 22 additions & 2 deletions tokio/src/runtime/basic_scheduler.rs
@@ -1,4 +1,5 @@
use crate::future::poll_fn;
use crate::loom::sync::atomic::AtomicBool;
use crate::loom::sync::Mutex;
use crate::park::{Park, Unpark};
use crate::runtime::task::{self, JoinHandle, Schedule, Task};
Expand All @@ -10,6 +11,7 @@ use std::cell::RefCell;
use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
use std::sync::Arc;
use std::task::Poll::{Pending, Ready};
use std::time::Duration;
Expand Down Expand Up @@ -70,6 +72,9 @@ struct Shared {

/// Unpark the blocked thread
unpark: Box<dyn Unpark>,

// indicates whether the blocked on thread was woken
woken: AtomicBool,
Comment on lines +76 to +77
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if we have multiple calls to block_on?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Darksonn the field is set to false in Spawner::waker_ref, Wouldn't that work?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you implement the loom test I have mentioned below, that will answer this question.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is additional synchronization that ensures only one concurrent blocker hits this bool.

}

/// Thread-local context.
Expand All @@ -85,6 +90,9 @@ struct Context {
const INITIAL_CAPACITY: usize = 64;

/// Max number of tasks to poll per tick.
#[cfg(loom)]
const MAX_TASKS_PER_TICK: usize = 4;
#[cfg(not(loom))]
const MAX_TASKS_PER_TICK: usize = 61;

/// How often to check the remote queue first.
Expand All @@ -101,6 +109,7 @@ impl<P: Park> BasicScheduler<P> {
shared: Arc::new(Shared {
queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
unpark: unpark as Box<dyn Unpark>,
woken: AtomicBool::new(false),
}),
};

Expand Down Expand Up @@ -177,12 +186,16 @@ impl<P: Park> Inner<P> {
let _enter = crate::runtime::enter(false);
let waker = scheduler.spawner.waker_ref();
let mut cx = std::task::Context::from_waker(&waker);
let mut polled = false;

pin!(future);

'outer: loop {
if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) {
return v;
if scheduler.spawner.was_woken() || !polled {
polled = true;
if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) {
return v;
}
}

for _ in 0..MAX_TASKS_PER_TICK {
Expand Down Expand Up @@ -329,8 +342,14 @@ impl Spawner {
}

fn waker_ref(&self) -> WakerRef<'_> {
// clear the woken bit
self.shared.woken.swap(false, AcqRel);
waker_ref(&self.shared)
}

fn was_woken(&self) -> bool {
self.shared.woken.load(Acquire)
}
}

impl fmt::Debug for Spawner {
Expand Down Expand Up @@ -384,6 +403,7 @@ impl Wake for Shared {

/// Wake by reference
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.woken.store(true, Release);
arc_self.unpark.unpark();
}
}
Expand Down
82 changes: 82 additions & 0 deletions tokio/src/runtime/tests/loom_basic_scheduler.rs
@@ -0,0 +1,82 @@
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Arc;
use crate::loom::thread;
use crate::runtime::{Builder, Runtime};
use crate::sync::oneshot::{self, Receiver};
use crate::task;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::Ordering::{Acquire, Release};
use std::task::{Context, Poll};

fn assert_at_most_num_polls(rt: Arc<Runtime>, at_most_polls: usize) {
let (tx, rx) = oneshot::channel();
let num_polls = Arc::new(AtomicUsize::new(0));
rt.spawn(async move {
for _ in 0..12 {
task::yield_now().await;
}
tx.send(()).unwrap();
});

rt.block_on(async {
BlockedFuture {
rx,
num_polls: num_polls.clone(),
}
.await;
});

let polls = num_polls.load(Acquire);
assert!(polls <= at_most_polls);
}

#[test]
fn block_on_num_polls() {
loom::model(|| {
// we expect at most 3 number of polls because there are
// three points at which we poll the future. At any of these
// points it can be ready:
//
// - when we fail to steal the parker and we block on a
// notification that it is available.
//
// - when we steal the parker and we schedule the future
//
// - when the future is woken up and we have ran the max
// number of tasks for the current tick or there are no
// more tasks to run.
//
let at_most = 3;

let rt1 = Arc::new(Builder::new_current_thread().build().unwrap());
let rt2 = rt1.clone();
let rt3 = rt1.clone();

let th1 = thread::spawn(move || assert_at_most_num_polls(rt1, at_most));
let th2 = thread::spawn(move || assert_at_most_num_polls(rt2, at_most));
let th3 = thread::spawn(move || assert_at_most_num_polls(rt3, at_most));

th1.join().unwrap();
th2.join().unwrap();
th3.join().unwrap();
});
}

struct BlockedFuture {
rx: Receiver<()>,
num_polls: Arc<AtomicUsize>,
}

impl Future for BlockedFuture {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.num_polls.fetch_add(1, Release);

match Pin::new(&mut self.rx).poll(cx) {
Poll::Pending => Poll::Pending,
_ => Poll::Ready(()),
}
}
}
1 change: 1 addition & 0 deletions tokio/src/runtime/tests/mod.rs
@@ -1,4 +1,5 @@
cfg_loom! {
mod loom_basic_scheduler;
mod loom_blocking;
mod loom_oneshot;
mod loom_pool;
Expand Down