Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Immediately drop new task when runtime is shut down #3752

Merged
merged 9 commits into from May 26, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
52 changes: 35 additions & 17 deletions tokio/src/runtime/basic_scheduler.rs
Expand Up @@ -84,13 +84,13 @@ unsafe impl Send for Entry {}

/// Scheduler state shared between threads.
struct Shared {
/// Remote run queue
queue: Mutex<VecDeque<Entry>>,
/// Remote run queue. None if the `Runtime` has been dropped.
queue: Mutex<Option<VecDeque<Entry>>>,

/// Unpark the blocked thread
/// Unpark the blocked thread.
unpark: Box<dyn Unpark>,

// indicates whether the blocked on thread was woken
/// Indicates whether the blocked on thread was woken.
woken: AtomicBool,
}

Expand Down Expand Up @@ -124,7 +124,7 @@ impl<P: Park> BasicScheduler<P> {

let spawner = Spawner {
shared: Arc::new(Shared {
queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))),
unpark: unpark as Box<dyn Unpark>,
woken: AtomicBool::new(false),
}),
Expand Down Expand Up @@ -352,17 +352,21 @@ impl<P: Park> Drop for BasicScheduler<P> {
}

// Drain remote queue
for entry in scheduler.spawner.shared.queue.lock().drain(..) {
match entry {
Entry::Schedule(task) => {
task.shutdown();
}
Entry::Release(..) => {
// Do nothing, each entry in the linked list was *just*
// dropped by the scheduler above.
let mut remote_queue = scheduler.spawner.shared.queue.lock();
if let Some(remote_queue) = remote_queue.take() {
for entry in remote_queue {
match entry {
Entry::Schedule(task) => {
task.shutdown();
}
Entry::Release(..) => {
// Do nothing, each entry in the linked list was *just*
// dropped by the scheduler above.
}
}
}
}
drop(remote_queue);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the drop here is significant (which it probably is because this is a lock guard), it probably is worth adding a comment saying so to avoid removing it in the future by accident.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the drop here in interact with the assert below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I only put it in to make it more visually clear where the lock is dropped.


assert!(context.tasks.borrow().owned.is_empty());
});
Expand Down Expand Up @@ -390,7 +394,10 @@ impl Spawner {
}

fn pop(&self) -> Option<Entry> {
self.shared.queue.lock().pop_front()
match self.shared.queue.lock().as_mut() {
Some(queue) => queue.pop_front(),
None => None,
}
}

fn waker_ref(&self) -> WakerRef<'_> {
Expand Down Expand Up @@ -429,7 +436,9 @@ impl Schedule for Arc<Shared> {
// safety: the task is inserted in the list in `bind`.
unsafe { cx.tasks.borrow_mut().owned.remove(ptr) }
} else {
self.queue.lock().push_back(Entry::Release(ptr));
if let Some(queue) = self.queue.lock().as_mut() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So what happens if this is None? Will the task leak?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After digging in, this looks fine, a comment explaining why would be nice.

queue.push_back(Entry::Release(ptr));
}
self.unpark.unpark();
// Returning `None` here prevents the task plumbing from being
// freed. It is then up to the scheduler through the queue we
Expand All @@ -445,8 +454,17 @@ impl Schedule for Arc<Shared> {
cx.tasks.borrow_mut().queue.push_back(task);
}
_ => {
self.queue.lock().push_back(Entry::Schedule(task));
self.unpark.unpark();
let mut guard = self.queue.lock();
if let Some(queue) = guard.as_mut() {
queue.push_back(Entry::Schedule(task));
drop(guard);
self.unpark.unpark();
} else {
// The runtime has shut down. We drop the new task
// immediately.
drop(guard);
task.shutdown();
}
}
});
}
Expand Down
22 changes: 22 additions & 0 deletions tokio/tests/rt_handle_block_on.rs
Expand Up @@ -388,6 +388,28 @@ rt_test! {

rt.block_on(async { some_non_async_function() });
}

#[test]
fn spawn_after_runtime_dropped() {
use futures::future::FutureExt;

let rt = rt();

let handle = rt.block_on(async move {
Handle::current()
});

let jh1 = handle.spawn(std::future::pending::<()>());

drop(rt);

let jh2 = handle.spawn(std::future::pending::<()>());

let err1 = jh1.now_or_never().unwrap().unwrap_err();
let err2 = jh2.now_or_never().unwrap().unwrap_err();
assert!(err1.is_cancelled());
assert!(err2.is_cancelled());
}
}

multi_threaded_rt_test! {
Expand Down