diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index ffe0bca74e6..cbd8e58bb33 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -84,13 +84,13 @@ unsafe impl Send for Entry {} /// Scheduler state shared between threads. struct Shared { - /// Remote run queue - queue: Mutex>, + /// Remote run queue. None if the `Runtime` has been dropped. + queue: Mutex>>, - /// Unpark the blocked thread + /// Unpark the blocked thread. unpark: Box, - // indicates whether the blocked on thread was woken + /// Indicates whether the blocked on thread was woken. woken: AtomicBool, } @@ -124,7 +124,7 @@ impl BasicScheduler

{ let spawner = Spawner { shared: Arc::new(Shared { - queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)), + queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))), unpark: unpark as Box, woken: AtomicBool::new(false), }), @@ -351,18 +351,29 @@ impl Drop for BasicScheduler

{ task.shutdown(); } - // Drain remote queue - for entry in scheduler.spawner.shared.queue.lock().drain(..) { - match entry { - Entry::Schedule(task) => { - task.shutdown(); - } - Entry::Release(..) => { - // Do nothing, each entry in the linked list was *just* - // dropped by the scheduler above. + // Drain remote queue and set it to None + let mut remote_queue = scheduler.spawner.shared.queue.lock(); + + // Using `Option::take` to replace the shared queue with `None`. + if let Some(remote_queue) = remote_queue.take() { + for entry in remote_queue { + match entry { + Entry::Schedule(task) => { + task.shutdown(); + } + Entry::Release(..) => { + // Do nothing, each entry in the linked list was *just* + // dropped by the scheduler above. + } } } } + // By dropping the mutex lock after the full duration of the above loop, + // any thread that sees the queue in the `None` state is guaranteed that + // the runtime has fully shut down. + // + // The assert below is unrelated to this mutex. + drop(remote_queue); assert!(context.tasks.borrow().owned.is_empty()); }); @@ -390,7 +401,10 @@ impl Spawner { } fn pop(&self) -> Option { - self.shared.queue.lock().pop_front() + match self.shared.queue.lock().as_mut() { + Some(queue) => queue.pop_front(), + None => None, + } } fn waker_ref(&self) -> WakerRef<'_> { @@ -429,7 +443,19 @@ impl Schedule for Arc { // safety: the task is inserted in the list in `bind`. unsafe { cx.tasks.borrow_mut().owned.remove(ptr) } } else { - self.queue.lock().push_back(Entry::Release(ptr)); + // By sending an `Entry::Release` to the runtime, we ask the + // runtime to remove this task from the linked list in + // `Tasks::owned`. + // + // If the queue is `None`, then the task was already removed + // from that list in the destructor of `BasicScheduler`. We do + // not do anything in this case for the same reason that + // `Entry::Release` messages are ignored in the remote queue + // drain loop of `BasicScheduler`'s destructor. + if let Some(queue) = self.queue.lock().as_mut() { + queue.push_back(Entry::Release(ptr)); + } + self.unpark.unpark(); // Returning `None` here prevents the task plumbing from being // freed. It is then up to the scheduler through the queue we @@ -445,8 +471,17 @@ impl Schedule for Arc { cx.tasks.borrow_mut().queue.push_back(task); } _ => { - self.queue.lock().push_back(Entry::Schedule(task)); - self.unpark.unpark(); + let mut guard = self.queue.lock(); + if let Some(queue) = guard.as_mut() { + queue.push_back(Entry::Schedule(task)); + drop(guard); + self.unpark.unpark(); + } else { + // The runtime has shut down. We drop the new task + // immediately. + drop(guard); + task.shutdown(); + } } }); } diff --git a/tokio/src/runtime/queue.rs b/tokio/src/runtime/queue.rs index b300e1ae6ad..51c8cb24001 100644 --- a/tokio/src/runtime/queue.rs +++ b/tokio/src/runtime/queue.rs @@ -109,7 +109,10 @@ impl Local { } /// Pushes a task to the back of the local queue, skipping the LIFO slot. - pub(super) fn push_back(&mut self, mut task: task::Notified, inject: &Inject) { + pub(super) fn push_back(&mut self, mut task: task::Notified, inject: &Inject) + where + T: crate::runtime::task::Schedule, + { let tail = loop { let head = self.inner.head.load(Acquire); let (steal, real) = unpack(head); @@ -504,7 +507,10 @@ impl Inject { } /// Pushes a value into the queue. - pub(super) fn push(&self, task: task::Notified) { + pub(super) fn push(&self, task: task::Notified) + where + T: crate::runtime::task::Schedule, + { // Acquire queue lock let mut p = self.pointers.lock(); @@ -512,7 +518,7 @@ impl Inject { // Drop the mutex to avoid a potential deadlock when // re-entering. drop(p); - drop(task); + task.shutdown(); return; } diff --git a/tokio/src/runtime/tests/loom_shutdown_join.rs b/tokio/src/runtime/tests/loom_shutdown_join.rs new file mode 100644 index 00000000000..6fbc4bfdedf --- /dev/null +++ b/tokio/src/runtime/tests/loom_shutdown_join.rs @@ -0,0 +1,28 @@ +use crate::runtime::{Builder, Handle}; + +#[test] +fn join_handle_cancel_on_shutdown() { + let mut builder = loom::model::Builder::new(); + builder.preemption_bound = Some(2); + builder.check(|| { + use futures::future::FutureExt; + + let rt = Builder::new_multi_thread() + .worker_threads(2) + .build() + .unwrap(); + + let handle = rt.block_on(async move { Handle::current() }); + + let jh1 = handle.spawn(futures::future::pending::<()>()); + + drop(rt); + + let jh2 = handle.spawn(futures::future::pending::<()>()); + + let err1 = jh1.now_or_never().unwrap().unwrap_err(); + let err2 = jh2.now_or_never().unwrap().unwrap_err(); + assert!(err1.is_cancelled()); + assert!(err2.is_cancelled()); + }); +} diff --git a/tokio/src/runtime/tests/mod.rs b/tokio/src/runtime/tests/mod.rs index ebb48de5290..c84ba1bc492 100644 --- a/tokio/src/runtime/tests/mod.rs +++ b/tokio/src/runtime/tests/mod.rs @@ -4,6 +4,7 @@ cfg_loom! { mod loom_oneshot; mod loom_pool; mod loom_queue; + mod loom_shutdown_join; } cfg_not_loom! { diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index 86d3f91b584..348862c58fd 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -823,7 +823,9 @@ impl Shared { } // Drain the injection queue - while self.inject.pop().is_some() {} + while let Some(task) = self.inject.pop() { + task.shutdown(); + } } fn ptr_eq(&self, other: &Shared) -> bool { diff --git a/tokio/tests/rt_handle_block_on.rs b/tokio/tests/rt_handle_block_on.rs index 5234258be11..17878c8d239 100644 --- a/tokio/tests/rt_handle_block_on.rs +++ b/tokio/tests/rt_handle_block_on.rs @@ -388,6 +388,28 @@ rt_test! { rt.block_on(async { some_non_async_function() }); } + + #[test] + fn spawn_after_runtime_dropped() { + use futures::future::FutureExt; + + let rt = rt(); + + let handle = rt.block_on(async move { + Handle::current() + }); + + let jh1 = handle.spawn(futures::future::pending::<()>()); + + drop(rt); + + let jh2 = handle.spawn(futures::future::pending::<()>()); + + let err1 = jh1.now_or_never().unwrap().unwrap_err(); + let err2 = jh2.now_or_never().unwrap().unwrap_err(); + assert!(err1.is_cancelled()); + assert!(err2.is_cancelled()); + } } multi_threaded_rt_test! {