Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: avoid early task shutdown #3870

Merged
merged 2 commits into from Jun 18, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions tokio/src/lib.rs
Expand Up @@ -9,6 +9,7 @@
rust_2018_idioms,
unreachable_pub
)]
#![deny(unused_must_use)]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, any unhandled Result values should result in a compilation failure.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not directly related, so if needed, I can split it off to another PR.

#![cfg_attr(docsrs, deny(broken_intra_doc_links))]
#![doc(test(
no_crate_inject,
Expand Down
24 changes: 15 additions & 9 deletions tokio/src/runtime/queue.rs
Expand Up @@ -124,9 +124,14 @@ impl<T> Local<T> {
// There is capacity for the task
break tail;
} else if steal != real {
// Concurrently stealing, this will free up capacity, so
// only push the new task onto the inject queue
inject.push(task);
// Concurrently stealing, this will free up capacity, so only
// push the new task onto the inject queue
//
// If the task failes to be pushed on the injection queue, there
// is nothing to be done at this point as the task cannot be a
// newly spawned task. Shutting down this task is handled by the
// worker shutdown process.
let _ = inject.push(task);
return;
} else {
// Push the current task and half of the queue into the
Expand Down Expand Up @@ -507,19 +512,19 @@ impl<T: 'static> Inject<T> {
}

/// Pushes a value into the queue.
pub(super) fn push(&self, task: task::Notified<T>)
///
/// Returns `Err(task)` if pushing fails due to the queue being shutdown.
/// The caller is expected to call `shutdown()` on the task **if and only
/// if** it is a newly spawned task.
pub(super) fn push(&self, task: task::Notified<T>) -> Result<(), task::Notified<T>>
where
T: crate::runtime::task::Schedule,
{
// Acquire queue lock
let mut p = self.pointers.lock();

if p.is_closed {
// Drop the mutex to avoid a potential deadlock when
// re-entering.
drop(p);
task.shutdown();
return;
return Err(task);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix is to push the task.shutdown() call out to a point where we know if the task is newly spawned or not.

}

// safety: only mutated with the lock held
Expand All @@ -538,6 +543,7 @@ impl<T: 'static> Inject<T> {
p.tail = Some(task);

self.len.store(len + 1, Release);
Ok(())
}

pub(super) fn push_batch(
Expand Down
8 changes: 7 additions & 1 deletion tokio/src/runtime/thread_pool/mod.rs
Expand Up @@ -94,7 +94,13 @@ impl Spawner {
F::Output: Send + 'static,
{
let (task, handle) = task::joinable(future);
self.shared.schedule(task, false);

if let Err(task) = self.shared.schedule(task, false) {
// The newly spawned task could not be scheduled because the runtime
// is shutting down. The task must be explicitly shutdown at this point.
task.shutdown();
}

handle
}

Expand Down
19 changes: 13 additions & 6 deletions tokio/src/runtime/thread_pool/worker.rs
Expand Up @@ -709,32 +709,39 @@ impl task::Schedule for Arc<Worker> {
}

fn schedule(&self, task: Notified) {
self.shared.schedule(task, false);
// Because this is not a newly spawned task, if scheduling fails due to
// the runtime shutting down, there is no special work that must happen
// here.
let _ = self.shared.schedule(task, false);
}

fn yield_now(&self, task: Notified) {
self.shared.schedule(task, true);
// Because this is not a newly spawned task, if scheduling fails due to
// the runtime shutting down, there is no special work that must happen
// here.
let _ = self.shared.schedule(task, true);
}
}

impl Shared {
pub(super) fn schedule(&self, task: Notified, is_yield: bool) {
pub(super) fn schedule(&self, task: Notified, is_yield: bool) -> Result<(), Notified> {
CURRENT.with(|maybe_cx| {
if let Some(cx) = maybe_cx {
// Make sure the task is part of the **current** scheduler.
if self.ptr_eq(&cx.worker.shared) {
// And the current thread still holds a core
if let Some(core) = cx.core.borrow_mut().as_mut() {
self.schedule_local(core, task, is_yield);
return;
return Ok(());
}
}
}

// Otherwise, use the inject queue
self.inject.push(task);
self.inject.push(task)?;
self.notify_parked();
});
Ok(())
})
}

fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) {
Expand Down
72 changes: 70 additions & 2 deletions tokio/tests/rt_threaded.rs
Expand Up @@ -12,8 +12,8 @@ use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::{mpsc, Arc};
use std::task::{Context, Poll};
use std::sync::{mpsc, Arc, Mutex};
use std::task::{Context, Poll, Waker};

#[test]
fn single_thread() {
Expand Down Expand Up @@ -405,6 +405,74 @@ async fn hang_on_shutdown() {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}

/// Demonstrates tokio-rs/tokio#3869
#[test]
fn wake_during_shutdown() {
struct Shared {
waker: Option<Waker>,
}

struct MyFuture {
shared: Arc<Mutex<Shared>>,
put_waker: bool,
}

impl MyFuture {
fn new() -> (Self, Self) {
let shared = Arc::new(Mutex::new(Shared { waker: None }));
let f1 = MyFuture {
shared: shared.clone(),
put_waker: true,
};
let f2 = MyFuture {
shared,
put_waker: false,
};
(f1, f2)
}
}

impl Future for MyFuture {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let me = Pin::into_inner(self);
let mut lock = me.shared.lock().unwrap();
println!("poll {}", me.put_waker);
if me.put_waker {
println!("putting");
lock.waker = Some(cx.waker().clone());
}
Poll::Pending
}
}

impl Drop for MyFuture {
fn drop(&mut self) {
println!("drop {} start", self.put_waker);
let mut lock = self.shared.lock().unwrap();
if !self.put_waker {
lock.waker.take().unwrap().wake();
}
drop(lock);
println!("drop {} stop", self.put_waker);
}
Comment on lines +451 to +459

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To reproduce this bug, it requires the task with put_waker==false(f2) to be dropped first to let it wakes up f1.
On my laptop it seems that the last spawned will always be dropped first. Do we have a guarantee about this behavior or some documents describing it? 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's any way to get any guarantees about the drop order.

}

let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.enable_all()
.build()
.unwrap();

let (f1, f2) = MyFuture::new();

rt.spawn(f1);
rt.spawn(f2);

rt.block_on(async { tokio::time::sleep(tokio::time::Duration::from_millis(20)).await });
}

fn rt() -> Runtime {
Runtime::new().unwrap()
}