Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: yield_now defers task until after driver poll #5223

Merged
merged 15 commits into from Nov 30, 2022
30 changes: 28 additions & 2 deletions tokio/src/runtime/context.rs
Expand Up @@ -7,8 +7,8 @@ use std::cell::Cell;
use crate::util::rand::{FastRand, RngSeed};

cfg_rt! {
use crate::runtime::scheduler;
use crate::runtime::task::Id;
use crate::runtime::{scheduler, task::Id, Defer};


use std::cell::RefCell;
use std::marker::PhantomData;
Expand All @@ -19,6 +19,7 @@ struct Context {
/// Handle to the runtime scheduler running on the current thread.
#[cfg(feature = "rt")]
handle: RefCell<Option<scheduler::Handle>>,

#[cfg(feature = "rt")]
current_task_id: Cell<Option<Id>>,

Expand All @@ -30,6 +31,10 @@ struct Context {
#[cfg(feature = "rt")]
runtime: Cell<EnterRuntime>,

/// Yielded task wakers are stored here and notified after resource drivers
/// are polled.
defer: RefCell<Option<Defer>>,

#[cfg(any(feature = "rt", feature = "macros"))]
rng: FastRand,

Expand All @@ -56,6 +61,9 @@ tokio_thread_local! {
#[cfg(feature = "rt")]
runtime: Cell::new(EnterRuntime::NotEntered),

#[cfg(feature = "rt")]
defer: RefCell::new(None),

#[cfg(any(feature = "rt", feature = "macros"))]
rng: FastRand::new(RngSeed::new()),

Expand Down Expand Up @@ -159,7 +167,12 @@ cfg_rt! {
if c.runtime.get().is_entered() {
None
} else {
// Set the entered flag
c.runtime.set(EnterRuntime::Entered { allow_block_in_place });

// Initialize queue to track yielded tasks
*c.defer.borrow_mut() = Some(Defer::new());

Some(EnterRuntimeGuard {
blocking: BlockingRegionGuard::new(),
handle: c.set_current(handle),
Expand Down Expand Up @@ -201,6 +214,14 @@ cfg_rt! {
DisallowBlockInPlaceGuard(reset)
}

pub(crate) fn with_defer<R>(f: impl FnOnce(&mut Defer) -> R) -> Option<R> {
CONTEXT.with(|c| {
let mut defer = c.defer.borrow_mut();

defer.as_mut().map(|defer| f(defer))
})
}

impl Context {
fn set_current(&self, handle: &scheduler::Handle) -> SetCurrentGuard {
let rng_seed = handle.seed_generator().next_seed();
Expand Down Expand Up @@ -235,6 +256,7 @@ cfg_rt! {
CONTEXT.with(|c| {
assert!(c.runtime.get().is_entered());
c.runtime.set(EnterRuntime::NotEntered);
*c.defer.borrow_mut() = None;
});
}
}
Expand Down Expand Up @@ -286,6 +308,10 @@ cfg_rt! {
return Err(());
}

// Wake any yielded tasks before parking in order to avoid
// blocking.
with_defer(|defer| defer.wake());

park.park_timeout(when - now);
}
}
Expand Down
28 changes: 28 additions & 0 deletions tokio/src/runtime/defer.rs
@@ -0,0 +1,28 @@
use std::collections::VecDeque;
use std::task::Waker;

pub(crate) struct Defer {
deferred: VecDeque<Waker>,
}
carllerche marked this conversation as resolved.
Show resolved Hide resolved

impl Defer {
pub(crate) fn new() -> Defer {
Defer {
deferred: Default::default(),
}
}

pub(crate) fn defer(&mut self, waker: Waker) {
self.deferred.push_back(waker);
}

pub(crate) fn is_empty(&self) -> bool {
self.deferred.is_empty()
}

pub(crate) fn wake(&mut self) {
for waker in self.deferred.drain(..) {
waker.wake();
}
}
}
3 changes: 3 additions & 0 deletions tokio/src/runtime/mod.rs
Expand Up @@ -228,6 +228,9 @@ cfg_rt! {
pub use crate::util::rand::RngSeed;
}

mod defer;
pub(crate) use defer::Defer;

mod handle;
pub use handle::{EnterGuard, Handle, TryCurrentError};

Expand Down
5 changes: 5 additions & 0 deletions tokio/src/runtime/park.rs
Expand Up @@ -273,6 +273,11 @@ impl CachedParkThread {
return Ok(v);
}

// Wake any yielded tasks before parking in order to avoid
// blocking.
#[cfg(feature = "rt")]
crate::runtime::context::with_defer(|defer| defer.wake());

self.park();
}
}
Expand Down
18 changes: 16 additions & 2 deletions tokio/src/runtime/scheduler/current_thread.rs
Expand Up @@ -3,7 +3,7 @@ use crate::loom::sync::atomic::AtomicBool;
use crate::loom::sync::{Arc, Mutex};
use crate::runtime::driver::{self, Driver};
use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task};
use crate::runtime::{blocking, scheduler, Config};
use crate::runtime::{blocking, context, scheduler, Config};
use crate::runtime::{MetricsBatch, SchedulerMetrics, WorkerMetrics};
use crate::sync::notify::Notify;
use crate::util::atomic_cell::AtomicCell;
Expand Down Expand Up @@ -267,6 +267,14 @@ impl Core {
}
}

fn did_defer_tasks() -> bool {
context::with_defer(|deferred| !deferred.is_empty()).unwrap()
}

fn wake_deferred_tasks() {
context::with_defer(|deferred| deferred.wake());
}

// ===== impl Context =====

impl Context {
Expand Down Expand Up @@ -299,6 +307,7 @@ impl Context {

let (c, _) = self.enter(core, || {
driver.park(&handle.driver);
wake_deferred_tasks();
});

core = c;
Expand All @@ -324,6 +333,7 @@ impl Context {
core.metrics.submit(&handle.shared.worker_metrics);
let (mut core, _) = self.enter(core, || {
driver.park_timeout(&handle.driver, Duration::from_millis(0));
wake_deferred_tasks();
});

core.driver = Some(driver);
Expand Down Expand Up @@ -557,7 +567,11 @@ impl CoreGuard<'_> {
let task = match entry {
Some(entry) => entry,
None => {
core = context.park(core, handle);
core = if did_defer_tasks() {
context.park_yield(core, handle)
} else {
context.park(core, handle)
};

// Try polling the `block_on` future next
continue 'outer;
Expand Down
16 changes: 15 additions & 1 deletion tokio/src/runtime/scheduler/multi_thread/worker.rs
Expand Up @@ -412,7 +412,11 @@ impl Context {
core = self.run_task(task, core)?;
} else {
// Wait for work
core = self.park(core);
core = if did_defer_tasks() {
self.park_timeout(core, Some(Duration::from_millis(0)))
} else {
self.park(core)
};
}
}

Expand Down Expand Up @@ -535,6 +539,8 @@ impl Context {
park.park(&self.worker.handle.driver);
}

wake_deferred_tasks();

// Remove `core` from context
core = self.core.borrow_mut().take().expect("core missing");

Expand Down Expand Up @@ -853,6 +859,14 @@ impl Handle {
}
}

fn did_defer_tasks() -> bool {
context::with_defer(|deferred| !deferred.is_empty()).unwrap()
}

fn wake_deferred_tasks() {
context::with_defer(|deferred| deferred.wake());
}

cfg_metrics! {
impl Shared {
pub(super) fn injection_queue_depth(&self) -> usize {
Expand Down
14 changes: 13 additions & 1 deletion tokio/src/task/yield_now.rs
@@ -1,3 +1,5 @@
use crate::runtime::context;

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -49,7 +51,17 @@ pub async fn yield_now() {
}

self.yielded = true;
cx.waker().wake_by_ref();

let defer = context::with_defer(|rt| {
rt.defer(cx.waker().clone());
});

if defer.is_none() {
// Not currently in a runtime, just notify ourselves
// immediately.
cx.waker().wake_by_ref();
}

Poll::Pending
}
}
Expand Down