Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: yield_now defers task until after driver poll #5223

Merged
merged 15 commits into from Nov 30, 2022
2 changes: 1 addition & 1 deletion tokio/src/runtime/context.rs
Expand Up @@ -9,7 +9,6 @@ use crate::util::rand::{FastRand, RngSeed};
cfg_rt! {
use crate::runtime::{scheduler, task::Id, Defer};


use std::cell::RefCell;
use std::marker::PhantomData;
use std::time::Duration;
Expand All @@ -33,6 +32,7 @@ struct Context {

/// Yielded task wakers are stored here and notified after resource drivers
/// are polled.
#[cfg(feature = "rt")]
defer: RefCell<Option<Defer>>,

#[cfg(any(feature = "rt", feature = "macros"))]
Expand Down
5 changes: 2 additions & 3 deletions tokio/src/runtime/defer.rs
@@ -1,8 +1,7 @@
use std::collections::VecDeque;
use std::task::Waker;

pub(crate) struct Defer {
deferred: VecDeque<Waker>,
deferred: Vec<Waker>,
}

impl Defer {
Expand All @@ -13,7 +12,7 @@ impl Defer {
}

pub(crate) fn defer(&mut self, waker: Waker) {
self.deferred.push_back(waker);
self.deferred.push(waker);
}

pub(crate) fn is_empty(&self) -> bool {
Expand Down
71 changes: 70 additions & 1 deletion tokio/tests/rt_common.rs
Expand Up @@ -9,6 +9,8 @@ macro_rules! rt_test {
mod current_thread_scheduler {
$($t)*

const NUM_WORKERS: usize = 1;

fn rt() -> Arc<Runtime> {
tokio::runtime::Builder::new_current_thread()
.enable_all()
Expand All @@ -22,6 +24,8 @@ macro_rules! rt_test {
mod threaded_scheduler_4_threads {
$($t)*

const NUM_WORKERS: usize = 4;

fn rt() -> Arc<Runtime> {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
Expand All @@ -36,6 +40,8 @@ macro_rules! rt_test {
mod threaded_scheduler_1_thread {
$($t)*

const NUM_WORKERS: usize = 1;

fn rt() -> Arc<Runtime> {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
Expand Down Expand Up @@ -652,7 +658,12 @@ rt_test! {
for _ in 0..100 {
rt.spawn(async {
loop {
tokio::task::yield_now().await;
// Don't use Tokio's `yield_now()` to avoid special defer
// logic.
let _: () = futures::future::poll_fn(|cx| {
cx.waker().wake_by_ref();
std::task::Poll::Pending
}).await;
}
});
}
Expand Down Expand Up @@ -680,6 +691,64 @@ rt_test! {
});
}

/// Tests that yielded tasks are not scheduled until **after** resource
/// drivers are polled.
#[test]
#[cfg(not(target_os="wasi"))]
fn yield_defers_until_park() {
use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
use std::sync::Barrier;

let rt = rt();

let flag = Arc::new(AtomicBool::new(false));
let barrier = Arc::new(Barrier::new(NUM_WORKERS));

rt.block_on(async {
// Make sure other workers cannot steal tasks
for _ in 0..(NUM_WORKERS-1) {
let flag = flag.clone();
let barrier = barrier.clone();

tokio::spawn(async move {
barrier.wait();

while !flag.load(SeqCst) {
std::thread::sleep(std::time::Duration::from_millis(1));
}
});
}

barrier.wait();

tokio::spawn(async move {
// Create a TCP litener
let listener = TcpListener::bind("0.0.0.0:0").await.unwrap();
let addr = listener.local_addr().unwrap();

tokio::join!(
async {
// Done blocking intentionally
let _socket = std::net::TcpStream::connect(addr).unwrap();

// Yield until connected
let mut cnt = 0;
while !flag.load(SeqCst){
tokio::task::yield_now().await;
cnt += 1;
}

assert!(cnt < 3, "actual={}", cnt);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this assertion complete after one iteration?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

False positives and all that are possible...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I"m also seeing cases where the OS appears to delay the I/O notification. I don't think there is any reliable way to test this here. I added a loom test, though.

},
async {
let _ = listener.accept().await.unwrap();
flag.store(true, SeqCst);
}
);
}).await.unwrap();
});
}

#[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn client_server_block_on() {
Expand Down