Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

task: stop polling more tasks when yield_now is used on a LocalSet #5211

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
83 changes: 51 additions & 32 deletions tokio/src/task/local.rs
Expand Up @@ -233,6 +233,10 @@ struct Context {
/// True if a task panicked without being handled and the local set is
/// configured to shutdown on unhandled panic.
unhandled_panic: Cell<bool>,

/// Should the currently running call to `tick` return after polling the
/// current future?
yield_now: Cell<bool>,
}

/// LocalSet state shared between threads.
Expand Down Expand Up @@ -393,6 +397,7 @@ impl LocalSet {
#[cfg(tokio_unstable)]
unhandled_panic: crate::runtime::UnhandledPanic::Ignore,
}),
yield_now: Cell::new(false),
unhandled_panic: Cell::new(false),
}),
_not_send: PhantomData,
Expand Down Expand Up @@ -598,10 +603,14 @@ impl LocalSet {
/// Ticks the scheduler, returning whether the local future needs to be
/// notified again.
fn tick(&self) -> bool {
self.context.yield_now.set(false);

for _ in 0..MAX_TASKS_PER_TICK {
// Make sure we didn't hit an unhandled panic
if self.context.unhandled_panic.get() {
panic!("a spawned task panicked and the LocalSet is configured to shutdown on unhandled panic");
// If yield_now is set, then the task we polled in the previous
// iteration waked itself. In this case, we should yield to the
// scheduler immediately.
if self.context.yield_now.get() || self.context.unhandled_panic.get() {
break;
}

match self.next_task() {
Expand All @@ -621,6 +630,11 @@ impl LocalSet {
}
}

// Make sure we didn't hit an unhandled panic.
if self.context.unhandled_panic.get() {
panic!("a spawned task panicked and the LocalSet is configured to shutdown on unhandled panic");
}

true
}

Expand Down Expand Up @@ -896,7 +910,7 @@ impl Context {
};

if let Some(notified) = notified {
self.shared.schedule(notified);
self.shared.schedule(notified, false);
}

handle
Expand Down Expand Up @@ -938,41 +952,42 @@ impl<T: Future> Future for RunUntil<'_, T> {

impl Shared {
/// Schedule the provided task on the scheduler.
fn schedule(&self, task: task::Notified<Arc<Self>>) {
fn schedule(&self, task: task::Notified<Arc<Self>>, yield_now: bool) {
CURRENT.with(|localdata| {
match localdata.ctx.get() {
Some(cx) if cx.shared.ptr_eq(self) => unsafe {
// Safety: if the current `LocalSet` context points to this
// `LocalSet`, then we are on the thread that owns it.
cx.shared.local_state.task_push_back(task);
},

// We are on the thread that owns the `LocalSet`, so we can
// wake to the local queue.
_ if localdata.get_id() == Some(self.local_state.owner) => {
unsafe {
// Safety: we just checked that the thread ID matches
// the localset's owner, so this is safe.
self.local_state.task_push_back(task);
if localdata.get_id() == Some(self.local_state.owner) {
unsafe {
// Safety: we just checked that the thread ID matches
// the localset's owner, so this is safe.
self.local_state.task_push_back(task);
}

let mut should_wake = true;
if let Some(cx) = localdata.ctx.get() {
if cx.shared.ptr_eq(self) {
should_wake = false;
// If the future waked itself, then we should return
// from tick.
cx.yield_now.set(yield_now | cx.yield_now.get());
}
}
if should_wake {
// We still have to wake the `LocalSet`, because it isn't
// currently being polled.
self.waker.wake();
}

} else {
// We are *not* on the thread that owns the `LocalSet`, so we
// have to wake to the remote queue.
_ => {
// First, check whether the queue is still there (if not, the
// LocalSet is dropped). Then push to it if so, and if not,
// do nothing.
let mut lock = self.queue.lock();

if let Some(queue) = lock.as_mut() {
queue.push_back(task);
drop(lock);
self.waker.wake();
}
//
// First, check whether the queue is still there (if not, the
// LocalSet is dropped). Then push to it if so, and if not,
// do nothing.
let mut lock = self.queue.lock();

if let Some(queue) = lock.as_mut() {
queue.push_back(task);
drop(lock);
self.waker.wake();
}
}
});
Expand All @@ -994,7 +1009,11 @@ impl task::Schedule for Arc<Shared> {
}

fn schedule(&self, task: task::Notified<Self>) {
Shared::schedule(self, task);
Shared::schedule(self, task, false);
}

fn yield_now(&self, task: task::Notified<Self>) {
Shared::schedule(self, task, true);
}

cfg_unstable! {
Expand Down
34 changes: 34 additions & 0 deletions tokio/tests/task_local_set.rs
Expand Up @@ -612,6 +612,40 @@ fn store_local_set_in_thread_local_with_runtime() {
});
}

#[test]
fn test_yield_now() {
use std::task::Poll;

static IS_OK: AtomicBool = AtomicBool::new(false);

let mut set = LocalSet::new();
let rt = rt();

let jh = set.spawn_local(async {
// If poll once, then it is ok.
IS_OK.store(true, Ordering::SeqCst);

tokio::task::yield_now().await;

// If polled twice, then it is no longer ok.
IS_OK.store(false, Ordering::SeqCst);
});

// Poll the set once.
//
// Since the task wakes itself, the LocalSet should only poll it once.
assert!(rt
.block_on(futures::future::poll_fn(|cx| Poll::Ready(
set.poll_unpin(cx)
)))
.is_pending());
// This cancels the future assuming that it was polled only once.
drop(set);

assert!(rt.block_on(jh).unwrap_err().is_cancelled());
assert!(IS_OK.load(Ordering::SeqCst));
}

#[cfg(tokio_unstable)]
mod unstable {
use tokio::runtime::UnhandledPanic;
Expand Down