New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Immediately drop new task when runtime is shut down #3752
Changes from all commits
f387080
96bfefe
688f83e
b5947a4
bac93e0
4725042
63e54c4
85d40b7
0bc95a2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -84,13 +84,13 @@ unsafe impl Send for Entry {} | |
|
||
/// Scheduler state shared between threads. | ||
struct Shared { | ||
/// Remote run queue | ||
queue: Mutex<VecDeque<Entry>>, | ||
/// Remote run queue. None if the `Runtime` has been dropped. | ||
queue: Mutex<Option<VecDeque<Entry>>>, | ||
|
||
/// Unpark the blocked thread | ||
/// Unpark the blocked thread. | ||
unpark: Box<dyn Unpark>, | ||
|
||
// indicates whether the blocked on thread was woken | ||
/// Indicates whether the blocked on thread was woken. | ||
woken: AtomicBool, | ||
} | ||
|
||
|
@@ -124,7 +124,7 @@ impl<P: Park> BasicScheduler<P> { | |
|
||
let spawner = Spawner { | ||
shared: Arc::new(Shared { | ||
queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)), | ||
queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))), | ||
unpark: unpark as Box<dyn Unpark>, | ||
woken: AtomicBool::new(false), | ||
}), | ||
|
@@ -351,18 +351,29 @@ impl<P: Park> Drop for BasicScheduler<P> { | |
task.shutdown(); | ||
} | ||
|
||
// Drain remote queue | ||
for entry in scheduler.spawner.shared.queue.lock().drain(..) { | ||
match entry { | ||
Entry::Schedule(task) => { | ||
task.shutdown(); | ||
} | ||
Entry::Release(..) => { | ||
// Do nothing, each entry in the linked list was *just* | ||
// dropped by the scheduler above. | ||
// Drain remote queue and set it to None | ||
let mut remote_queue = scheduler.spawner.shared.queue.lock(); | ||
|
||
// Using `Option::take` to replace the shared queue with `None`. | ||
if let Some(remote_queue) = remote_queue.take() { | ||
for entry in remote_queue { | ||
match entry { | ||
Entry::Schedule(task) => { | ||
task.shutdown(); | ||
} | ||
Entry::Release(..) => { | ||
// Do nothing, each entry in the linked list was *just* | ||
// dropped by the scheduler above. | ||
} | ||
} | ||
} | ||
} | ||
// By dropping the mutex lock after the full duration of the above loop, | ||
// any thread that sees the queue in the `None` state is guaranteed that | ||
// the runtime has fully shut down. | ||
// | ||
// The assert below is unrelated to this mutex. | ||
drop(remote_queue); | ||
|
||
assert!(context.tasks.borrow().owned.is_empty()); | ||
}); | ||
|
@@ -390,7 +401,10 @@ impl Spawner { | |
} | ||
|
||
fn pop(&self) -> Option<Entry> { | ||
self.shared.queue.lock().pop_front() | ||
match self.shared.queue.lock().as_mut() { | ||
Some(queue) => queue.pop_front(), | ||
None => None, | ||
} | ||
} | ||
|
||
fn waker_ref(&self) -> WakerRef<'_> { | ||
|
@@ -429,7 +443,19 @@ impl Schedule for Arc<Shared> { | |
// safety: the task is inserted in the list in `bind`. | ||
unsafe { cx.tasks.borrow_mut().owned.remove(ptr) } | ||
} else { | ||
self.queue.lock().push_back(Entry::Release(ptr)); | ||
// By sending an `Entry::Release` to the runtime, we ask the | ||
// runtime to remove this task from the linked list in | ||
// `Tasks::owned`. | ||
// | ||
// If the queue is `None`, then the task was already removed | ||
// from that list in the destructor of `BasicScheduler`. We do | ||
// not do anything in this case for the same reason that | ||
// `Entry::Release` messages are ignored in the remote queue | ||
// drain loop of `BasicScheduler`'s destructor. | ||
if let Some(queue) = self.queue.lock().as_mut() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So what happens if this is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After digging in, this looks fine, a comment explaining why would be nice. |
||
queue.push_back(Entry::Release(ptr)); | ||
} | ||
|
||
self.unpark.unpark(); | ||
// Returning `None` here prevents the task plumbing from being | ||
// freed. It is then up to the scheduler through the queue we | ||
|
@@ -445,8 +471,17 @@ impl Schedule for Arc<Shared> { | |
cx.tasks.borrow_mut().queue.push_back(task); | ||
} | ||
_ => { | ||
self.queue.lock().push_back(Entry::Schedule(task)); | ||
self.unpark.unpark(); | ||
let mut guard = self.queue.lock(); | ||
if let Some(queue) = guard.as_mut() { | ||
queue.push_back(Entry::Schedule(task)); | ||
drop(guard); | ||
self.unpark.unpark(); | ||
} else { | ||
// The runtime has shut down. We drop the new task | ||
// immediately. | ||
drop(guard); | ||
task.shutdown(); | ||
} | ||
} | ||
}); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
use crate::runtime::{Builder, Handle}; | ||
|
||
#[test] | ||
fn join_handle_cancel_on_shutdown() { | ||
let mut builder = loom::model::Builder::new(); | ||
builder.preemption_bound = Some(2); | ||
builder.check(|| { | ||
use futures::future::FutureExt; | ||
|
||
let rt = Builder::new_multi_thread() | ||
.worker_threads(2) | ||
.build() | ||
.unwrap(); | ||
|
||
let handle = rt.block_on(async move { Handle::current() }); | ||
|
||
let jh1 = handle.spawn(futures::future::pending::<()>()); | ||
|
||
drop(rt); | ||
|
||
let jh2 = handle.spawn(futures::future::pending::<()>()); | ||
|
||
let err1 = jh1.now_or_never().unwrap().unwrap_err(); | ||
let err2 = jh2.now_or_never().unwrap().unwrap_err(); | ||
assert!(err1.is_cancelled()); | ||
assert!(err2.is_cancelled()); | ||
}); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -388,6 +388,28 @@ rt_test! { | |
|
||
rt.block_on(async { some_non_async_function() }); | ||
} | ||
|
||
#[test] | ||
fn spawn_after_runtime_dropped() { | ||
use futures::future::FutureExt; | ||
|
||
let rt = rt(); | ||
|
||
let handle = rt.block_on(async move { | ||
Handle::current() | ||
}); | ||
|
||
let jh1 = handle.spawn(futures::future::pending::<()>()); | ||
|
||
drop(rt); | ||
|
||
let jh2 = handle.spawn(futures::future::pending::<()>()); | ||
|
||
let err1 = jh1.now_or_never().unwrap().unwrap_err(); | ||
let err2 = jh2.now_or_never().unwrap().unwrap_err(); | ||
Comment on lines
+402
to
+409
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @carllerche The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is fixed by the "Fix loom test" commit |
||
assert!(err1.is_cancelled()); | ||
assert!(err2.is_cancelled()); | ||
} | ||
} | ||
|
||
multi_threaded_rt_test! { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the drop here is significant (which it probably is because this is a lock guard), it probably is worth adding a comment saying so to avoid removing it in the future by accident.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the drop here in interact with the assert below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I only put it in to make it more visually clear where the lock is dropped.