diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index b26c45d3a1d..0ee69c42c7f 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -11,6 +11,7 @@ use std::cell::RefCell; use std::collections::VecDeque; use std::fmt; use std::future::Future; +use std::ptr::NonNull; use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; use std::sync::Arc; use std::task::Poll::{Pending, Ready}; @@ -65,10 +66,26 @@ struct Tasks { queue: VecDeque>>, } +/// A remote scheduler entry. +/// +/// These are filled in by remote threads sending instructions to the scheduler. +enum Entry { + /// A remote thread wants to spawn a task. + Schedule(task::Notified>), + /// A remote thread wants a task to be released by the scheduler. We only + /// have access to its header. + Release(NonNull), +} + +// Safety: Used correctly, the task header is "thread safe". Ultimately the task +// is owned by the current thread executor, for which this instruction is being +// sent. +unsafe impl Send for Entry {} + /// Scheduler state shared between threads. struct Shared { /// Remote run queue - queue: Mutex>>>, + queue: Mutex>, /// Unpark the blocked thread unpark: Box, @@ -203,22 +220,27 @@ impl Inner

{ let tick = scheduler.tick; scheduler.tick = scheduler.tick.wrapping_add(1); - let next = if tick % REMOTE_FIRST_INTERVAL == 0 { - scheduler - .spawner - .pop() - .or_else(|| context.tasks.borrow_mut().queue.pop_front()) + let entry = if tick % REMOTE_FIRST_INTERVAL == 0 { + scheduler.spawner.pop().or_else(|| { + context + .tasks + .borrow_mut() + .queue + .pop_front() + .map(Entry::Schedule) + }) } else { context .tasks .borrow_mut() .queue .pop_front() + .map(Entry::Schedule) .or_else(|| scheduler.spawner.pop()) }; - match next { - Some(task) => crate::coop::budget(|| task.run()), + let entry = match entry { + Some(entry) => entry, None => { // Park until the thread is signaled scheduler.park.park().ok().expect("failed to park"); @@ -226,6 +248,29 @@ impl Inner

{ // Try polling the `block_on` future next continue 'outer; } + }; + + match entry { + Entry::Schedule(task) => crate::coop::budget(|| task.run()), + Entry::Release(ptr) => { + // Safety: the task header is only legally provided + // internally in the header, so we know that it is a + // valid (or in particular *allocated*) header that + // is part of the linked list. + unsafe { + let removed = context.tasks.borrow_mut().owned.remove(ptr); + + // TODO: This seems like it should hold, because + // there doesn't seem to be an avenue for anyone + // else to fiddle with the owned tasks + // collection *after* a remote thread has marked + // it as released, and at that point, the only + // location at which it can be removed is here + // or in the Drop implementation of the + // scheduler. + debug_assert!(removed.is_some()); + } + } } } @@ -308,8 +353,16 @@ impl Drop for BasicScheduler

{ } // Drain remote queue - for task in scheduler.spawner.shared.queue.lock().drain(..) { - task.shutdown(); + for entry in scheduler.spawner.shared.queue.lock().drain(..) { + match entry { + Entry::Schedule(task) => { + task.shutdown(); + } + Entry::Release(..) => { + // Do nothing, each entry in the linked list was *just* + // dropped by the scheduler above. + } + } } assert!(context.tasks.borrow().owned.is_empty()); @@ -337,7 +390,7 @@ impl Spawner { handle } - fn pop(&self) -> Option>> { + fn pop(&self) -> Option { self.shared.queue.lock().pop_front() } @@ -370,15 +423,19 @@ impl Schedule for Arc { } fn release(&self, task: &Task) -> Option> { - use std::ptr::NonNull; - CURRENT.with(|maybe_cx| { - let cx = maybe_cx.expect("scheduler context missing"); + let ptr = NonNull::from(task.header()); - // safety: the task is inserted in the list in `bind`. - unsafe { - let ptr = NonNull::from(task.header()); - cx.tasks.borrow_mut().owned.remove(ptr) + if let Some(cx) = maybe_cx { + // safety: the task is inserted in the list in `bind`. + unsafe { cx.tasks.borrow_mut().owned.remove(ptr) } + } else { + self.queue.lock().push_back(Entry::Release(ptr)); + self.unpark.unpark(); + // Returning `None` here prevents the task plumbing from being + // freed. It is then up to the scheduler through the queue we + // just added to, or its Drop impl to free the task. + None } }) } @@ -389,7 +446,7 @@ impl Schedule for Arc { cx.tasks.borrow_mut().queue.push_back(task); } _ => { - self.queue.lock().push_back(task); + self.queue.lock().push_back(Entry::Schedule(task)); self.unpark.unpark(); } }); diff --git a/tokio/tests/task_abort.rs b/tokio/tests/task_abort.rs index e84f19c3d03..1d72ac3a25c 100644 --- a/tokio/tests/task_abort.rs +++ b/tokio/tests/task_abort.rs @@ -24,3 +24,70 @@ fn test_abort_without_panic_3157() { let _ = handle.await; }); } + +/// Checks that a suspended task can be aborted inside of a current_thread +/// executor without panicking as reported in issue #3662: +/// . +#[test] +fn test_abort_without_panic_3662() { + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::Arc; + + struct DropCheck(Arc); + + impl Drop for DropCheck { + fn drop(&mut self) { + self.0.store(true, Ordering::SeqCst); + } + } + + let rt = tokio::runtime::Builder::new_current_thread() + .build() + .unwrap(); + + rt.block_on(async move { + let drop_flag = Arc::new(AtomicBool::new(false)); + let drop_check = DropCheck(drop_flag.clone()); + + let j = tokio::spawn(async move { + // NB: just grab the drop check here so that it becomes part of the + // task. + let _drop_check = drop_check; + futures::future::pending::<()>().await; + }); + + let drop_flag2 = drop_flag.clone(); + + let task = std::thread::spawn(move || { + // This runs in a separate thread so it doesn't have immediate + // thread-local access to the executor. It does however transition + // the underlying task to be completed, which will cause it to be + // dropped (in this thread no less). + assert!(!drop_flag2.load(Ordering::SeqCst)); + j.abort(); + // TODO: is this guaranteed at this point? + // assert!(drop_flag2.load(Ordering::SeqCst)); + j + }) + .join() + .unwrap(); + + assert!(drop_flag.load(Ordering::SeqCst)); + let result = task.await; + assert!(result.unwrap_err().is_cancelled()); + + // Note: We do the following to trigger a deferred task cleanup. + // + // The relevant piece of code you want to look at is in: + // `Inner::block_on` of `basic_scheduler.rs`. + // + // We cause the cleanup to happen by having a poll return Pending once + // so that the scheduler can go into the "auxilliary tasks" mode, at + // which point the task is removed from the scheduler. + let i = tokio::spawn(async move { + tokio::task::yield_now().await; + }); + + i.await.unwrap(); + }); +}