From c6221e82b9f0973cd6c6d5ae67842434c3add9dc Mon Sep 17 00:00:00 2001 From: John-John Tedro Date: Sun, 4 Apr 2021 10:25:16 +0200 Subject: [PATCH 1/4] Prevent basic scheduler panic on task release (fixes #3662) --- tokio/src/runtime/basic_scheduler.rs | 14 ++++---- tokio/tests/task_abort.rs | 48 ++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 6 deletions(-) diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index b26c45d3a1d..1541d4245a3 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -373,12 +373,14 @@ impl Schedule for Arc { use std::ptr::NonNull; CURRENT.with(|maybe_cx| { - let cx = maybe_cx.expect("scheduler context missing"); - - // safety: the task is inserted in the list in `bind`. - unsafe { - let ptr = NonNull::from(task.header()); - cx.tasks.borrow_mut().owned.remove(ptr) + if let Some(cx) = maybe_cx { + // safety: the task is inserted in the list in `bind`. + unsafe { + let ptr = NonNull::from(task.header()); + cx.tasks.borrow_mut().owned.remove(ptr) + } + } else { + None } }) } diff --git a/tokio/tests/task_abort.rs b/tokio/tests/task_abort.rs index e84f19c3d03..3b9e464313f 100644 --- a/tokio/tests/task_abort.rs +++ b/tokio/tests/task_abort.rs @@ -24,3 +24,51 @@ fn test_abort_without_panic_3157() { let _ = handle.await; }); } + +/// Checks that a suspended task can be aborted inside of a current_thread +/// executor without panicking as reported in issue #3662: +/// . +#[test] +fn test_abort_without_panic_3662() { + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::Arc; + + struct DropCheck(Arc); + + impl Drop for DropCheck { + fn drop(&mut self) { + self.0.store(true, Ordering::SeqCst); + } + } + + let rt = tokio::runtime::Builder::new_current_thread() + .worker_threads(1) + .build() + .unwrap(); + + rt.block_on(async move { + let drop_flag = Arc::new(AtomicBool::new(false)); + let drop_flag2 = drop_flag.clone(); + + let j = tokio::spawn(async move { + let drop_check = DropCheck(drop_flag2); + futures::future::pending::<()>().await; + drop(drop_check); + }); + + let task = tokio::task::spawn_blocking(move || { + // This runs in a separate thread so it doesn't have immediate + // thread-local access to the executor. It does however transition + // the underlying task to be completed, which will cause it to be + // dropped (in this thread no less). + j.abort(); + j + }) + .await + .unwrap(); + + assert!(drop_flag.load(Ordering::SeqCst)); + let result = task.await; + assert!(result.unwrap_err().is_cancelled()); + }); +} From 119a854e5728a7b5aea3d03fdc73c81512015e37 Mon Sep 17 00:00:00 2001 From: John-John Tedro Date: Sun, 4 Apr 2021 11:09:39 +0200 Subject: [PATCH 2/4] Release remote tasks that have been dropped --- tokio/src/runtime/basic_scheduler.rs | 91 ++++++++++++++++++++++------ tokio/tests/task_abort.rs | 38 +++++++++++- 2 files changed, 108 insertions(+), 21 deletions(-) diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index 1541d4245a3..0ee69c42c7f 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -11,6 +11,7 @@ use std::cell::RefCell; use std::collections::VecDeque; use std::fmt; use std::future::Future; +use std::ptr::NonNull; use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; use std::sync::Arc; use std::task::Poll::{Pending, Ready}; @@ -65,10 +66,26 @@ struct Tasks { queue: VecDeque>>, } +/// A remote scheduler entry. +/// +/// These are filled in by remote threads sending instructions to the scheduler. +enum Entry { + /// A remote thread wants to spawn a task. + Schedule(task::Notified>), + /// A remote thread wants a task to be released by the scheduler. We only + /// have access to its header. + Release(NonNull), +} + +// Safety: Used correctly, the task header is "thread safe". Ultimately the task +// is owned by the current thread executor, for which this instruction is being +// sent. +unsafe impl Send for Entry {} + /// Scheduler state shared between threads. struct Shared { /// Remote run queue - queue: Mutex>>>, + queue: Mutex>, /// Unpark the blocked thread unpark: Box, @@ -203,22 +220,27 @@ impl Inner

{ let tick = scheduler.tick; scheduler.tick = scheduler.tick.wrapping_add(1); - let next = if tick % REMOTE_FIRST_INTERVAL == 0 { - scheduler - .spawner - .pop() - .or_else(|| context.tasks.borrow_mut().queue.pop_front()) + let entry = if tick % REMOTE_FIRST_INTERVAL == 0 { + scheduler.spawner.pop().or_else(|| { + context + .tasks + .borrow_mut() + .queue + .pop_front() + .map(Entry::Schedule) + }) } else { context .tasks .borrow_mut() .queue .pop_front() + .map(Entry::Schedule) .or_else(|| scheduler.spawner.pop()) }; - match next { - Some(task) => crate::coop::budget(|| task.run()), + let entry = match entry { + Some(entry) => entry, None => { // Park until the thread is signaled scheduler.park.park().ok().expect("failed to park"); @@ -226,6 +248,29 @@ impl Inner

{ // Try polling the `block_on` future next continue 'outer; } + }; + + match entry { + Entry::Schedule(task) => crate::coop::budget(|| task.run()), + Entry::Release(ptr) => { + // Safety: the task header is only legally provided + // internally in the header, so we know that it is a + // valid (or in particular *allocated*) header that + // is part of the linked list. + unsafe { + let removed = context.tasks.borrow_mut().owned.remove(ptr); + + // TODO: This seems like it should hold, because + // there doesn't seem to be an avenue for anyone + // else to fiddle with the owned tasks + // collection *after* a remote thread has marked + // it as released, and at that point, the only + // location at which it can be removed is here + // or in the Drop implementation of the + // scheduler. + debug_assert!(removed.is_some()); + } + } } } @@ -308,8 +353,16 @@ impl Drop for BasicScheduler

{ } // Drain remote queue - for task in scheduler.spawner.shared.queue.lock().drain(..) { - task.shutdown(); + for entry in scheduler.spawner.shared.queue.lock().drain(..) { + match entry { + Entry::Schedule(task) => { + task.shutdown(); + } + Entry::Release(..) => { + // Do nothing, each entry in the linked list was *just* + // dropped by the scheduler above. + } + } } assert!(context.tasks.borrow().owned.is_empty()); @@ -337,7 +390,7 @@ impl Spawner { handle } - fn pop(&self) -> Option>> { + fn pop(&self) -> Option { self.shared.queue.lock().pop_front() } @@ -370,16 +423,18 @@ impl Schedule for Arc { } fn release(&self, task: &Task) -> Option> { - use std::ptr::NonNull; - CURRENT.with(|maybe_cx| { + let ptr = NonNull::from(task.header()); + if let Some(cx) = maybe_cx { // safety: the task is inserted in the list in `bind`. - unsafe { - let ptr = NonNull::from(task.header()); - cx.tasks.borrow_mut().owned.remove(ptr) - } + unsafe { cx.tasks.borrow_mut().owned.remove(ptr) } } else { + self.queue.lock().push_back(Entry::Release(ptr)); + self.unpark.unpark(); + // Returning `None` here prevents the task plumbing from being + // freed. It is then up to the scheduler through the queue we + // just added to, or its Drop impl to free the task. None } }) @@ -391,7 +446,7 @@ impl Schedule for Arc { cx.tasks.borrow_mut().queue.push_back(task); } _ => { - self.queue.lock().push_back(task); + self.queue.lock().push_back(Entry::Schedule(task)); self.unpark.unpark(); } }); diff --git a/tokio/tests/task_abort.rs b/tokio/tests/task_abort.rs index 3b9e464313f..05b59022a48 100644 --- a/tokio/tests/task_abort.rs +++ b/tokio/tests/task_abort.rs @@ -32,6 +32,7 @@ fn test_abort_without_panic_3157() { fn test_abort_without_panic_3662() { use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; + use std::task::Poll; struct DropCheck(Arc); @@ -48,20 +49,26 @@ fn test_abort_without_panic_3662() { rt.block_on(async move { let drop_flag = Arc::new(AtomicBool::new(false)); - let drop_flag2 = drop_flag.clone(); + let drop_check = DropCheck(drop_flag.clone()); let j = tokio::spawn(async move { - let drop_check = DropCheck(drop_flag2); + // NB: just grab the drop check here so that it becomes part of the + // task. + let _drop_check = drop_check; futures::future::pending::<()>().await; - drop(drop_check); }); + let drop_flag2 = drop_flag.clone(); + let task = tokio::task::spawn_blocking(move || { // This runs in a separate thread so it doesn't have immediate // thread-local access to the executor. It does however transition // the underlying task to be completed, which will cause it to be // dropped (in this thread no less). + assert!(!drop_flag2.load(Ordering::SeqCst)); j.abort(); + // TODO: is this guaranteed at this point? + // assert!(drop_flag2.load(Ordering::SeqCst)); j }) .await @@ -70,5 +77,30 @@ fn test_abort_without_panic_3662() { assert!(drop_flag.load(Ordering::SeqCst)); let result = task.await; assert!(result.unwrap_err().is_cancelled()); + + // Note: We do the following to trigger a deferred task cleanup. + // + // The relevant piece of code you want to look at is in: + // `Inner::block_on` of `basic_scheduler.rs`. + // + // We cause the cleanup to happen by having a poll return Pending once + // so that the scheduler can go into the "auxilliary tasks" mode, at + // which point the task is removed from the scheduler. + let i = tokio::spawn(async move { + let mut first = true; + + let task = futures::future::poll_fn(|cx| { + if std::mem::take(&mut first) { + cx.waker().wake_by_ref(); + Poll::Pending + } else { + Poll::Ready(()) + } + }); + + task.await; + }); + + i.await.unwrap(); }); } From 45878562e4ef783510f7bb232c4df49911ed9d62 Mon Sep 17 00:00:00 2001 From: John-John Tedro Date: Sun, 4 Apr 2021 19:29:27 +0200 Subject: [PATCH 3/4] Apply suggestions from code review Co-authored-by: Alice Ryhl --- tokio/tests/task_abort.rs | 21 ++++----------------- 1 file changed, 4 insertions(+), 17 deletions(-) diff --git a/tokio/tests/task_abort.rs b/tokio/tests/task_abort.rs index 05b59022a48..78ef06128ff 100644 --- a/tokio/tests/task_abort.rs +++ b/tokio/tests/task_abort.rs @@ -32,7 +32,6 @@ fn test_abort_without_panic_3157() { fn test_abort_without_panic_3662() { use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; - use std::task::Poll; struct DropCheck(Arc); @@ -43,7 +42,6 @@ fn test_abort_without_panic_3662() { } let rt = tokio::runtime::Builder::new_current_thread() - .worker_threads(1) .build() .unwrap(); @@ -55,12 +53,12 @@ fn test_abort_without_panic_3662() { // NB: just grab the drop check here so that it becomes part of the // task. let _drop_check = drop_check; - futures::future::pending::<()>().await; + std::future::pending::<()>().await; }); let drop_flag2 = drop_flag.clone(); - let task = tokio::task::spawn_blocking(move || { + let task = std::thread::spawn(move || { // This runs in a separate thread so it doesn't have immediate // thread-local access to the executor. It does however transition // the underlying task to be completed, which will cause it to be @@ -71,7 +69,7 @@ fn test_abort_without_panic_3662() { // assert!(drop_flag2.load(Ordering::SeqCst)); j }) - .await + .join() .unwrap(); assert!(drop_flag.load(Ordering::SeqCst)); @@ -87,18 +85,7 @@ fn test_abort_without_panic_3662() { // so that the scheduler can go into the "auxilliary tasks" mode, at // which point the task is removed from the scheduler. let i = tokio::spawn(async move { - let mut first = true; - - let task = futures::future::poll_fn(|cx| { - if std::mem::take(&mut first) { - cx.waker().wake_by_ref(); - Poll::Pending - } else { - Poll::Ready(()) - } - }); - - task.await; + tokio::task::yield_now().await; }); i.await.unwrap(); From ae4e3d968b79e94deda998c8c754edc79ff17766 Mon Sep 17 00:00:00 2001 From: John-John Tedro Date: Sun, 4 Apr 2021 19:42:54 +0200 Subject: [PATCH 4/4] Revert to futures::future::pending to stay within MSRV --- tokio/tests/task_abort.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/tests/task_abort.rs b/tokio/tests/task_abort.rs index 78ef06128ff..1d72ac3a25c 100644 --- a/tokio/tests/task_abort.rs +++ b/tokio/tests/task_abort.rs @@ -53,7 +53,7 @@ fn test_abort_without_panic_3662() { // NB: just grab the drop check here so that it becomes part of the // task. let _drop_check = drop_check; - std::future::pending::<()>().await; + futures::future::pending::<()>().await; }); let drop_flag2 = drop_flag.clone();