Skip to content

Commit

Permalink
runtime: immediately drop new task when runtime is shut down (#3752)
Browse files Browse the repository at this point in the history
  • Loading branch information
Darksonn committed May 26, 2021
1 parent 4abeca7 commit a39e6c2
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 22 deletions.
71 changes: 53 additions & 18 deletions tokio/src/runtime/basic_scheduler.rs
Expand Up @@ -84,13 +84,13 @@ unsafe impl Send for Entry {}

/// Scheduler state shared between threads.
struct Shared {
/// Remote run queue
queue: Mutex<VecDeque<Entry>>,
/// Remote run queue. None if the `Runtime` has been dropped.
queue: Mutex<Option<VecDeque<Entry>>>,

/// Unpark the blocked thread
/// Unpark the blocked thread.
unpark: Box<dyn Unpark>,

// indicates whether the blocked on thread was woken
/// Indicates whether the blocked on thread was woken.
woken: AtomicBool,
}

Expand Down Expand Up @@ -124,7 +124,7 @@ impl<P: Park> BasicScheduler<P> {

let spawner = Spawner {
shared: Arc::new(Shared {
queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))),
unpark: unpark as Box<dyn Unpark>,
woken: AtomicBool::new(false),
}),
Expand Down Expand Up @@ -351,18 +351,29 @@ impl<P: Park> Drop for BasicScheduler<P> {
task.shutdown();
}

// Drain remote queue
for entry in scheduler.spawner.shared.queue.lock().drain(..) {
match entry {
Entry::Schedule(task) => {
task.shutdown();
}
Entry::Release(..) => {
// Do nothing, each entry in the linked list was *just*
// dropped by the scheduler above.
// Drain remote queue and set it to None
let mut remote_queue = scheduler.spawner.shared.queue.lock();

// Using `Option::take` to replace the shared queue with `None`.
if let Some(remote_queue) = remote_queue.take() {
for entry in remote_queue {
match entry {
Entry::Schedule(task) => {
task.shutdown();
}
Entry::Release(..) => {
// Do nothing, each entry in the linked list was *just*
// dropped by the scheduler above.
}
}
}
}
// By dropping the mutex lock after the full duration of the above loop,
// any thread that sees the queue in the `None` state is guaranteed that
// the runtime has fully shut down.
//
// The assert below is unrelated to this mutex.
drop(remote_queue);

assert!(context.tasks.borrow().owned.is_empty());
});
Expand Down Expand Up @@ -390,7 +401,10 @@ impl Spawner {
}

fn pop(&self) -> Option<Entry> {
self.shared.queue.lock().pop_front()
match self.shared.queue.lock().as_mut() {
Some(queue) => queue.pop_front(),
None => None,
}
}

fn waker_ref(&self) -> WakerRef<'_> {
Expand Down Expand Up @@ -429,7 +443,19 @@ impl Schedule for Arc<Shared> {
// safety: the task is inserted in the list in `bind`.
unsafe { cx.tasks.borrow_mut().owned.remove(ptr) }
} else {
self.queue.lock().push_back(Entry::Release(ptr));
// By sending an `Entry::Release` to the runtime, we ask the
// runtime to remove this task from the linked list in
// `Tasks::owned`.
//
// If the queue is `None`, then the task was already removed
// from that list in the destructor of `BasicScheduler`. We do
// not do anything in this case for the same reason that
// `Entry::Release` messages are ignored in the remote queue
// drain loop of `BasicScheduler`'s destructor.
if let Some(queue) = self.queue.lock().as_mut() {
queue.push_back(Entry::Release(ptr));
}

self.unpark.unpark();
// Returning `None` here prevents the task plumbing from being
// freed. It is then up to the scheduler through the queue we
Expand All @@ -445,8 +471,17 @@ impl Schedule for Arc<Shared> {
cx.tasks.borrow_mut().queue.push_back(task);
}
_ => {
self.queue.lock().push_back(Entry::Schedule(task));
self.unpark.unpark();
let mut guard = self.queue.lock();
if let Some(queue) = guard.as_mut() {
queue.push_back(Entry::Schedule(task));
drop(guard);
self.unpark.unpark();
} else {
// The runtime has shut down. We drop the new task
// immediately.
drop(guard);
task.shutdown();
}
}
});
}
Expand Down
12 changes: 9 additions & 3 deletions tokio/src/runtime/queue.rs
Expand Up @@ -109,7 +109,10 @@ impl<T> Local<T> {
}

/// Pushes a task to the back of the local queue, skipping the LIFO slot.
pub(super) fn push_back(&mut self, mut task: task::Notified<T>, inject: &Inject<T>) {
pub(super) fn push_back(&mut self, mut task: task::Notified<T>, inject: &Inject<T>)
where
T: crate::runtime::task::Schedule,
{
let tail = loop {
let head = self.inner.head.load(Acquire);
let (steal, real) = unpack(head);
Expand Down Expand Up @@ -504,15 +507,18 @@ impl<T: 'static> Inject<T> {
}

/// Pushes a value into the queue.
pub(super) fn push(&self, task: task::Notified<T>) {
pub(super) fn push(&self, task: task::Notified<T>)
where
T: crate::runtime::task::Schedule,
{
// Acquire queue lock
let mut p = self.pointers.lock();

if p.is_closed {
// Drop the mutex to avoid a potential deadlock when
// re-entering.
drop(p);
drop(task);
task.shutdown();
return;
}

Expand Down
28 changes: 28 additions & 0 deletions tokio/src/runtime/tests/loom_shutdown_join.rs
@@ -0,0 +1,28 @@
use crate::runtime::{Builder, Handle};

#[test]
fn join_handle_cancel_on_shutdown() {
let mut builder = loom::model::Builder::new();
builder.preemption_bound = Some(2);
builder.check(|| {
use futures::future::FutureExt;

let rt = Builder::new_multi_thread()
.worker_threads(2)
.build()
.unwrap();

let handle = rt.block_on(async move { Handle::current() });

let jh1 = handle.spawn(futures::future::pending::<()>());

drop(rt);

let jh2 = handle.spawn(futures::future::pending::<()>());

let err1 = jh1.now_or_never().unwrap().unwrap_err();
let err2 = jh2.now_or_never().unwrap().unwrap_err();
assert!(err1.is_cancelled());
assert!(err2.is_cancelled());
});
}
1 change: 1 addition & 0 deletions tokio/src/runtime/tests/mod.rs
Expand Up @@ -4,6 +4,7 @@ cfg_loom! {
mod loom_oneshot;
mod loom_pool;
mod loom_queue;
mod loom_shutdown_join;
}

cfg_not_loom! {
Expand Down
4 changes: 3 additions & 1 deletion tokio/src/runtime/thread_pool/worker.rs
Expand Up @@ -823,7 +823,9 @@ impl Shared {
}

// Drain the injection queue
while self.inject.pop().is_some() {}
while let Some(task) = self.inject.pop() {
task.shutdown();
}
}

fn ptr_eq(&self, other: &Shared) -> bool {
Expand Down
22 changes: 22 additions & 0 deletions tokio/tests/rt_handle_block_on.rs
Expand Up @@ -388,6 +388,28 @@ rt_test! {

rt.block_on(async { some_non_async_function() });
}

#[test]
fn spawn_after_runtime_dropped() {
use futures::future::FutureExt;

let rt = rt();

let handle = rt.block_on(async move {
Handle::current()
});

let jh1 = handle.spawn(futures::future::pending::<()>());

drop(rt);

let jh2 = handle.spawn(futures::future::pending::<()>());

let err1 = jh1.now_or_never().unwrap().unwrap_err();
let err2 = jh2.now_or_never().unwrap().unwrap_err();
assert!(err1.is_cancelled());
assert!(err2.is_cancelled());
}
}

multi_threaded_rt_test! {
Expand Down

1 comment on commit a39e6c2

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark 'sync_mpsc'.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.

Benchmark suite Current: a39e6c2 Previous: 4abeca7 Ratio
send_large 58437 ns/iter (± 7804) 27153 ns/iter (± 3075) 2.15

This comment was automatically generated by workflow using github-action-benchmark.

CC: @tokio-rs/maintainers

Please sign in to comment.