Skip to content

Commit

Permalink
rt: fix panic in JoinHandle::abort() when called from other thread (#…
Browse files Browse the repository at this point in the history
…3672)

When aborting a task registered with a current-thread scheduler from off runtime, the tasks may not
be immediately unlinked from the runtime. Instead, send a message to the runtime, notifying it to
remove the aborted task.

Fixes #3662
  • Loading branch information
udoprog committed Apr 8, 2021
1 parent 1d56552 commit 1a72b28
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 19 deletions.
95 changes: 76 additions & 19 deletions tokio/src/runtime/basic_scheduler.rs
Expand Up @@ -11,6 +11,7 @@ use std::cell::RefCell;
use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
use std::ptr::NonNull;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
use std::sync::Arc;
use std::task::Poll::{Pending, Ready};
Expand Down Expand Up @@ -65,10 +66,26 @@ struct Tasks {
queue: VecDeque<task::Notified<Arc<Shared>>>,
}

/// A remote scheduler entry.
///
/// These are filled in by remote threads sending instructions to the scheduler.
enum Entry {
/// A remote thread wants to spawn a task.
Schedule(task::Notified<Arc<Shared>>),
/// A remote thread wants a task to be released by the scheduler. We only
/// have access to its header.
Release(NonNull<task::Header>),
}

// Safety: Used correctly, the task header is "thread safe". Ultimately the task
// is owned by the current thread executor, for which this instruction is being
// sent.
unsafe impl Send for Entry {}

/// Scheduler state shared between threads.
struct Shared {
/// Remote run queue
queue: Mutex<VecDeque<task::Notified<Arc<Shared>>>>,
queue: Mutex<VecDeque<Entry>>,

/// Unpark the blocked thread
unpark: Box<dyn Unpark>,
Expand Down Expand Up @@ -203,29 +220,57 @@ impl<P: Park> Inner<P> {
let tick = scheduler.tick;
scheduler.tick = scheduler.tick.wrapping_add(1);

let next = if tick % REMOTE_FIRST_INTERVAL == 0 {
scheduler
.spawner
.pop()
.or_else(|| context.tasks.borrow_mut().queue.pop_front())
let entry = if tick % REMOTE_FIRST_INTERVAL == 0 {
scheduler.spawner.pop().or_else(|| {
context
.tasks
.borrow_mut()
.queue
.pop_front()
.map(Entry::Schedule)
})
} else {
context
.tasks
.borrow_mut()
.queue
.pop_front()
.map(Entry::Schedule)
.or_else(|| scheduler.spawner.pop())
};

match next {
Some(task) => crate::coop::budget(|| task.run()),
let entry = match entry {
Some(entry) => entry,
None => {
// Park until the thread is signaled
scheduler.park.park().expect("failed to park");

// Try polling the `block_on` future next
continue 'outer;
}
};

match entry {
Entry::Schedule(task) => crate::coop::budget(|| task.run()),
Entry::Release(ptr) => {
// Safety: the task header is only legally provided
// internally in the header, so we know that it is a
// valid (or in particular *allocated*) header that
// is part of the linked list.
unsafe {
let removed = context.tasks.borrow_mut().owned.remove(ptr);

// TODO: This seems like it should hold, because
// there doesn't seem to be an avenue for anyone
// else to fiddle with the owned tasks
// collection *after* a remote thread has marked
// it as released, and at that point, the only
// location at which it can be removed is here
// or in the Drop implementation of the
// scheduler.
debug_assert!(removed.is_some());
}
}
}
}

Expand Down Expand Up @@ -307,8 +352,16 @@ impl<P: Park> Drop for BasicScheduler<P> {
}

// Drain remote queue
for task in scheduler.spawner.shared.queue.lock().drain(..) {
task.shutdown();
for entry in scheduler.spawner.shared.queue.lock().drain(..) {
match entry {
Entry::Schedule(task) => {
task.shutdown();
}
Entry::Release(..) => {
// Do nothing, each entry in the linked list was *just*
// dropped by the scheduler above.
}
}
}

assert!(context.tasks.borrow().owned.is_empty());
Expand Down Expand Up @@ -336,7 +389,7 @@ impl Spawner {
handle
}

fn pop(&self) -> Option<task::Notified<Arc<Shared>>> {
fn pop(&self) -> Option<Entry> {
self.shared.queue.lock().pop_front()
}

Expand Down Expand Up @@ -369,15 +422,19 @@ impl Schedule for Arc<Shared> {
}

fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
use std::ptr::NonNull;

CURRENT.with(|maybe_cx| {
let cx = maybe_cx.expect("scheduler context missing");
let ptr = NonNull::from(task.header());

// safety: the task is inserted in the list in `bind`.
unsafe {
let ptr = NonNull::from(task.header());
cx.tasks.borrow_mut().owned.remove(ptr)
if let Some(cx) = maybe_cx {
// safety: the task is inserted in the list in `bind`.
unsafe { cx.tasks.borrow_mut().owned.remove(ptr) }
} else {
self.queue.lock().push_back(Entry::Release(ptr));
self.unpark.unpark();
// Returning `None` here prevents the task plumbing from being
// freed. It is then up to the scheduler through the queue we
// just added to, or its Drop impl to free the task.
None
}
})
}
Expand All @@ -388,7 +445,7 @@ impl Schedule for Arc<Shared> {
cx.tasks.borrow_mut().queue.push_back(task);
}
_ => {
self.queue.lock().push_back(task);
self.queue.lock().push_back(Entry::Schedule(task));
self.unpark.unpark();
}
});
Expand Down
67 changes: 67 additions & 0 deletions tokio/tests/task_abort.rs
Expand Up @@ -24,3 +24,70 @@ fn test_abort_without_panic_3157() {
let _ = handle.await;
});
}

/// Checks that a suspended task can be aborted inside of a current_thread
/// executor without panicking as reported in issue #3662:
/// <https://github.com/tokio-rs/tokio/issues/3662>.
#[test]
fn test_abort_without_panic_3662() {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

struct DropCheck(Arc<AtomicBool>);

impl Drop for DropCheck {
fn drop(&mut self) {
self.0.store(true, Ordering::SeqCst);
}
}

let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();

rt.block_on(async move {
let drop_flag = Arc::new(AtomicBool::new(false));
let drop_check = DropCheck(drop_flag.clone());

let j = tokio::spawn(async move {
// NB: just grab the drop check here so that it becomes part of the
// task.
let _drop_check = drop_check;
futures::future::pending::<()>().await;
});

let drop_flag2 = drop_flag.clone();

let task = std::thread::spawn(move || {
// This runs in a separate thread so it doesn't have immediate
// thread-local access to the executor. It does however transition
// the underlying task to be completed, which will cause it to be
// dropped (in this thread no less).
assert!(!drop_flag2.load(Ordering::SeqCst));
j.abort();
// TODO: is this guaranteed at this point?
// assert!(drop_flag2.load(Ordering::SeqCst));
j
})
.join()
.unwrap();

assert!(drop_flag.load(Ordering::SeqCst));
let result = task.await;
assert!(result.unwrap_err().is_cancelled());

// Note: We do the following to trigger a deferred task cleanup.
//
// The relevant piece of code you want to look at is in:
// `Inner::block_on` of `basic_scheduler.rs`.
//
// We cause the cleanup to happen by having a poll return Pending once
// so that the scheduler can go into the "auxilliary tasks" mode, at
// which point the task is removed from the scheduler.
let i = tokio::spawn(async move {
tokio::task::yield_now().await;
});

i.await.unwrap();
});
}

0 comments on commit 1a72b28

Please sign in to comment.