Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent basic_scheduler from panicking if a task is aborted from another thread #3672

Merged
merged 4 commits into from Apr 8, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
95 changes: 76 additions & 19 deletions tokio/src/runtime/basic_scheduler.rs
Expand Up @@ -11,6 +11,7 @@ use std::cell::RefCell;
use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
use std::ptr::NonNull;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
use std::sync::Arc;
use std::task::Poll::{Pending, Ready};
Expand Down Expand Up @@ -65,10 +66,26 @@ struct Tasks {
queue: VecDeque<task::Notified<Arc<Shared>>>,
}

/// A remote scheduler entry.
///
/// These are filled in by remote threads sending instructions to the scheduler.
enum Entry {
/// A remote thread wants to spawn a task.
Schedule(task::Notified<Arc<Shared>>),
/// A remote thread wants a task to be released by the scheduler. We only
/// have access to its header.
Release(NonNull<task::Header>),
}

// Safety: Used correctly, the task header is "thread safe". Ultimately the task
// is owned by the current thread executor, for which this instruction is being
// sent.
unsafe impl Send for Entry {}

/// Scheduler state shared between threads.
struct Shared {
/// Remote run queue
queue: Mutex<VecDeque<task::Notified<Arc<Shared>>>>,
queue: Mutex<VecDeque<Entry>>,

/// Unpark the blocked thread
unpark: Box<dyn Unpark>,
Expand Down Expand Up @@ -203,29 +220,57 @@ impl<P: Park> Inner<P> {
let tick = scheduler.tick;
scheduler.tick = scheduler.tick.wrapping_add(1);

let next = if tick % REMOTE_FIRST_INTERVAL == 0 {
scheduler
.spawner
.pop()
.or_else(|| context.tasks.borrow_mut().queue.pop_front())
let entry = if tick % REMOTE_FIRST_INTERVAL == 0 {
scheduler.spawner.pop().or_else(|| {
context
.tasks
.borrow_mut()
.queue
.pop_front()
.map(Entry::Schedule)
})
} else {
context
.tasks
.borrow_mut()
.queue
.pop_front()
.map(Entry::Schedule)
.or_else(|| scheduler.spawner.pop())
};

match next {
Some(task) => crate::coop::budget(|| task.run()),
let entry = match entry {
Some(entry) => entry,
None => {
// Park until the thread is signaled
scheduler.park.park().ok().expect("failed to park");

// Try polling the `block_on` future next
continue 'outer;
}
};

match entry {
Entry::Schedule(task) => crate::coop::budget(|| task.run()),
Entry::Release(ptr) => {
// Safety: the task header is only legally provided
// internally in the header, so we know that it is a
// valid (or in particular *allocated*) header that
// is part of the linked list.
unsafe {
let removed = context.tasks.borrow_mut().owned.remove(ptr);

// TODO: This seems like it should hold, because
// there doesn't seem to be an avenue for anyone
// else to fiddle with the owned tasks
// collection *after* a remote thread has marked
// it as released, and at that point, the only
// location at which it can be removed is here
// or in the Drop implementation of the
// scheduler.
debug_assert!(removed.is_some());
}
}
}
}

Expand Down Expand Up @@ -308,8 +353,16 @@ impl<P: Park> Drop for BasicScheduler<P> {
}

// Drain remote queue
for task in scheduler.spawner.shared.queue.lock().drain(..) {
task.shutdown();
for entry in scheduler.spawner.shared.queue.lock().drain(..) {
match entry {
Entry::Schedule(task) => {
task.shutdown();
}
Entry::Release(..) => {
// Do nothing, each entry in the linked list was *just*
// dropped by the scheduler above.
}
}
}

assert!(context.tasks.borrow().owned.is_empty());
Expand Down Expand Up @@ -337,7 +390,7 @@ impl Spawner {
handle
}

fn pop(&self) -> Option<task::Notified<Arc<Shared>>> {
fn pop(&self) -> Option<Entry> {
self.shared.queue.lock().pop_front()
}

Expand Down Expand Up @@ -370,15 +423,19 @@ impl Schedule for Arc<Shared> {
}

fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
use std::ptr::NonNull;

CURRENT.with(|maybe_cx| {
let cx = maybe_cx.expect("scheduler context missing");
let ptr = NonNull::from(task.header());

// safety: the task is inserted in the list in `bind`.
unsafe {
let ptr = NonNull::from(task.header());
cx.tasks.borrow_mut().owned.remove(ptr)
if let Some(cx) = maybe_cx {
// safety: the task is inserted in the list in `bind`.
unsafe { cx.tasks.borrow_mut().owned.remove(ptr) }
} else {
self.queue.lock().push_back(Entry::Release(ptr));
self.unpark.unpark();
// Returning `None` here prevents the task plumbing from being
// freed. It is then up to the scheduler through the queue we
// just added to, or its Drop impl to free the task.
None
}
})
}
Expand All @@ -389,7 +446,7 @@ impl Schedule for Arc<Shared> {
cx.tasks.borrow_mut().queue.push_back(task);
}
_ => {
self.queue.lock().push_back(task);
self.queue.lock().push_back(Entry::Schedule(task));
self.unpark.unpark();
}
});
Expand Down
80 changes: 80 additions & 0 deletions tokio/tests/task_abort.rs
Expand Up @@ -24,3 +24,83 @@ fn test_abort_without_panic_3157() {
let _ = handle.await;
});
}

/// Checks that a suspended task can be aborted inside of a current_thread
/// executor without panicking as reported in issue #3662:
/// <https://github.com/tokio-rs/tokio/issues/3662>.
#[test]
fn test_abort_without_panic_3662() {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::Poll;

struct DropCheck(Arc<AtomicBool>);

impl Drop for DropCheck {
fn drop(&mut self) {
self.0.store(true, Ordering::SeqCst);
}
}

let rt = tokio::runtime::Builder::new_current_thread()
.worker_threads(1)
udoprog marked this conversation as resolved.
Show resolved Hide resolved
.build()
.unwrap();

rt.block_on(async move {
let drop_flag = Arc::new(AtomicBool::new(false));
let drop_check = DropCheck(drop_flag.clone());

let j = tokio::spawn(async move {
// NB: just grab the drop check here so that it becomes part of the
// task.
let _drop_check = drop_check;
futures::future::pending::<()>().await;
udoprog marked this conversation as resolved.
Show resolved Hide resolved
});

let drop_flag2 = drop_flag.clone();

let task = tokio::task::spawn_blocking(move || {
udoprog marked this conversation as resolved.
Show resolved Hide resolved
// This runs in a separate thread so it doesn't have immediate
// thread-local access to the executor. It does however transition
// the underlying task to be completed, which will cause it to be
// dropped (in this thread no less).
assert!(!drop_flag2.load(Ordering::SeqCst));
j.abort();
// TODO: is this guaranteed at this point?
// assert!(drop_flag2.load(Ordering::SeqCst));
Comment on lines +62 to +69
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well if the task's destructor runs in this thread, it would be, but it's unclear to me that the destructor would necessarily run here.

j
})
.await
.unwrap();

assert!(drop_flag.load(Ordering::SeqCst));
let result = task.await;
assert!(result.unwrap_err().is_cancelled());

// Note: We do the following to trigger a deferred task cleanup.
//
// The relevant piece of code you want to look at is in:
// `Inner::block_on` of `basic_scheduler.rs`.
//
// We cause the cleanup to happen by having a poll return Pending once
// so that the scheduler can go into the "auxilliary tasks" mode, at
// which point the task is removed from the scheduler.
let i = tokio::spawn(async move {
let mut first = true;

let task = futures::future::poll_fn(|cx| {
if std::mem::take(&mut first) {
cx.waker().wake_by_ref();
Poll::Pending
} else {
Poll::Ready(())
}
});
udoprog marked this conversation as resolved.
Show resolved Hide resolved

task.await;
});

i.await.unwrap();
});
}