Skip to content

Commit

Permalink
rt: avoid early task shutdown (#3870)
Browse files Browse the repository at this point in the history
Tokio 1.7.0 introduced a change intended to eagerly shutdown newly
spawned tasks if the runtime is in the process of shutting down.
However, it introduced a bug where already spawned tasks could be
shutdown too early, resulting in the potential introduction of deadlocks
if tasks acquired mutexes in drop handlers.

Fixes #3869
  • Loading branch information
carllerche committed Jun 18, 2021
1 parent 34c6a26 commit 7601dc6
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 20 deletions.
8 changes: 8 additions & 0 deletions tokio/CHANGELOG.md
@@ -1,3 +1,11 @@
# 1.7.1 (June 18, 2021)

### Fixed

- runtime: fix early task shutdown during runtime shutdown ([#3870])

[#3870]: https://github.com/tokio-rs/tokio/pull/3870

# 1.7.0 (June 15, 2021)

### Added
Expand Down
4 changes: 2 additions & 2 deletions tokio/Cargo.toml
Expand Up @@ -7,12 +7,12 @@ name = "tokio"
# - README.md
# - Update CHANGELOG.md.
# - Create "v1.0.x" git tag.
version = "1.7.0"
version = "1.7.1"
edition = "2018"
authors = ["Tokio Contributors <team@tokio.rs>"]
license = "MIT"
readme = "README.md"
documentation = "https://docs.rs/tokio/1.7.0/tokio/"
documentation = "https://docs.rs/tokio/1.7.1/tokio/"
repository = "https://github.com/tokio-rs/tokio"
homepage = "https://tokio.rs"
description = """
Expand Down
1 change: 1 addition & 0 deletions tokio/src/lib.rs
Expand Up @@ -9,6 +9,7 @@
rust_2018_idioms,
unreachable_pub
)]
#![deny(unused_must_use)]
#![cfg_attr(docsrs, deny(broken_intra_doc_links))]
#![doc(test(
no_crate_inject,
Expand Down
24 changes: 15 additions & 9 deletions tokio/src/runtime/queue.rs
Expand Up @@ -124,9 +124,14 @@ impl<T> Local<T> {
// There is capacity for the task
break tail;
} else if steal != real {
// Concurrently stealing, this will free up capacity, so
// only push the new task onto the inject queue
inject.push(task);
// Concurrently stealing, this will free up capacity, so only
// push the new task onto the inject queue
//
// If the task failes to be pushed on the injection queue, there
// is nothing to be done at this point as the task cannot be a
// newly spawned task. Shutting down this task is handled by the
// worker shutdown process.
let _ = inject.push(task);
return;
} else {
// Push the current task and half of the queue into the
Expand Down Expand Up @@ -507,19 +512,19 @@ impl<T: 'static> Inject<T> {
}

/// Pushes a value into the queue.
pub(super) fn push(&self, task: task::Notified<T>)
///
/// Returns `Err(task)` if pushing fails due to the queue being shutdown.
/// The caller is expected to call `shutdown()` on the task **if and only
/// if** it is a newly spawned task.
pub(super) fn push(&self, task: task::Notified<T>) -> Result<(), task::Notified<T>>
where
T: crate::runtime::task::Schedule,
{
// Acquire queue lock
let mut p = self.pointers.lock();

if p.is_closed {
// Drop the mutex to avoid a potential deadlock when
// re-entering.
drop(p);
task.shutdown();
return;
return Err(task);
}

// safety: only mutated with the lock held
Expand All @@ -538,6 +543,7 @@ impl<T: 'static> Inject<T> {
p.tail = Some(task);

self.len.store(len + 1, Release);
Ok(())
}

pub(super) fn push_batch(
Expand Down
8 changes: 7 additions & 1 deletion tokio/src/runtime/thread_pool/mod.rs
Expand Up @@ -94,7 +94,13 @@ impl Spawner {
F::Output: Send + 'static,
{
let (task, handle) = task::joinable(future);
self.shared.schedule(task, false);

if let Err(task) = self.shared.schedule(task, false) {
// The newly spawned task could not be scheduled because the runtime
// is shutting down. The task must be explicitly shutdown at this point.
task.shutdown();
}

handle
}

Expand Down
19 changes: 13 additions & 6 deletions tokio/src/runtime/thread_pool/worker.rs
Expand Up @@ -709,32 +709,39 @@ impl task::Schedule for Arc<Worker> {
}

fn schedule(&self, task: Notified) {
self.shared.schedule(task, false);
// Because this is not a newly spawned task, if scheduling fails due to
// the runtime shutting down, there is no special work that must happen
// here.
let _ = self.shared.schedule(task, false);
}

fn yield_now(&self, task: Notified) {
self.shared.schedule(task, true);
// Because this is not a newly spawned task, if scheduling fails due to
// the runtime shutting down, there is no special work that must happen
// here.
let _ = self.shared.schedule(task, true);
}
}

impl Shared {
pub(super) fn schedule(&self, task: Notified, is_yield: bool) {
pub(super) fn schedule(&self, task: Notified, is_yield: bool) -> Result<(), Notified> {
CURRENT.with(|maybe_cx| {
if let Some(cx) = maybe_cx {
// Make sure the task is part of the **current** scheduler.
if self.ptr_eq(&cx.worker.shared) {
// And the current thread still holds a core
if let Some(core) = cx.core.borrow_mut().as_mut() {
self.schedule_local(core, task, is_yield);
return;
return Ok(());
}
}
}

// Otherwise, use the inject queue
self.inject.push(task);
self.inject.push(task)?;
self.notify_parked();
});
Ok(())
})
}

fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) {
Expand Down
72 changes: 70 additions & 2 deletions tokio/tests/rt_threaded.rs
Expand Up @@ -12,8 +12,8 @@ use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::{mpsc, Arc};
use std::task::{Context, Poll};
use std::sync::{mpsc, Arc, Mutex};
use std::task::{Context, Poll, Waker};

#[test]
fn single_thread() {
Expand Down Expand Up @@ -405,6 +405,74 @@ async fn hang_on_shutdown() {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}

/// Demonstrates tokio-rs/tokio#3869
#[test]
fn wake_during_shutdown() {
struct Shared {
waker: Option<Waker>,
}

struct MyFuture {
shared: Arc<Mutex<Shared>>,
put_waker: bool,
}

impl MyFuture {
fn new() -> (Self, Self) {
let shared = Arc::new(Mutex::new(Shared { waker: None }));
let f1 = MyFuture {
shared: shared.clone(),
put_waker: true,
};
let f2 = MyFuture {
shared,
put_waker: false,
};
(f1, f2)
}
}

impl Future for MyFuture {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let me = Pin::into_inner(self);
let mut lock = me.shared.lock().unwrap();
println!("poll {}", me.put_waker);
if me.put_waker {
println!("putting");
lock.waker = Some(cx.waker().clone());
}
Poll::Pending
}
}

impl Drop for MyFuture {
fn drop(&mut self) {
println!("drop {} start", self.put_waker);
let mut lock = self.shared.lock().unwrap();
if !self.put_waker {
lock.waker.take().unwrap().wake();
}
drop(lock);
println!("drop {} stop", self.put_waker);
}
}

let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.enable_all()
.build()
.unwrap();

let (f1, f2) = MyFuture::new();

rt.spawn(f1);
rt.spawn(f2);

rt.block_on(async { tokio::time::sleep(tokio::time::Duration::from_millis(20)).await });
}

fn rt() -> Runtime {
Runtime::new().unwrap()
}

0 comments on commit 7601dc6

Please sign in to comment.