Skip to content

Commit

Permalink
Fix basic_scheduler deadlock when waking during drop
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Jan 6, 2020
1 parent 5930ace commit def8678
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 13 deletions.
33 changes: 20 additions & 13 deletions tokio/src/task/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,19 +261,26 @@ where
/// If the mutex around the remote queue is poisoned _and_ the current
/// thread is not already panicking. This is safe to call in a `Drop` impl.
fn close_remote(&self) {
#[allow(clippy::match_wild_err_arm)]
let mut lock = match self.remote_queue.lock() {
// If the lock is poisoned, but the thread is already panicking,
// avoid a double panic. This is necessary since this fn can be
// called in a drop impl.
Err(_) if std::thread::panicking() => return,
Err(_) => panic!("mutex poisoned"),
Ok(lock) => lock,
};
lock.open = false;

while let Some(task) = lock.queue.pop_front() {
task.shutdown();
loop {
#[allow(clippy::match_wild_err_arm)]
let mut lock = match self.remote_queue.lock() {
// If the lock is poisoned, but the thread is already panicking,
// avoid a double panic. This is necessary since this fn can be
// called in a drop impl.
Err(_) if std::thread::panicking() => return,
Err(_) => panic!("mutex poisoned"),
Ok(lock) => lock,
};
lock.open = false;

if let Some(task) = lock.queue.pop_front() {
// Release lock before dropping task, in case
// task tries to re-schedule in its Drop.
drop(lock);
task.shutdown();
} else {
return;
}
}
}

Expand Down
59 changes: 59 additions & 0 deletions tokio/tests/rt_basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,65 @@ fn acquire_mutex_in_drop() {
drop(rt);
}

#[test]
fn wake_while_rt_is_dropping() {
use tokio::task;

struct OnDrop<F: FnMut()>(F);

impl<F: FnMut()> Drop for OnDrop<F> {
fn drop(&mut self) {
(self.0)()
}
}

let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let (tx3, rx3) = oneshot::channel();

let mut rt = rt();

let h1 = rt.handle().clone();

rt.handle().spawn(async move {
// Ensure a waker gets stored in oneshot 1.
let _ = rx1.await;
tx3.send(()).unwrap();
});

rt.handle().spawn(async move {
// When this task is dropped, we'll be "closing remotes".
// We spawn a new task that owns the `tx1`, to move its Drop
// out of here.
//
// Importantly, the oneshot 1 has a waker already stored, so
// the eventual drop here will try to re-schedule again.
let mut opt_tx1 = Some(tx1);
let _d = OnDrop(move || {
let tx1 = opt_tx1.take().unwrap();
h1.spawn(async move {
tx1.send(()).unwrap();
});
});
let _ = rx2.await;
});

rt.handle().spawn(async move {
let _ = rx3.await;
// We'll never get here, but once task 3 drops, this will
// force task 2 to re-schedule since it's waiting on oneshot 2.
tx2.send(()).unwrap();
});

// Tick the loop
rt.block_on(async {
task::yield_now().await;
});

// Drop the rt
drop(rt);
}

fn rt() -> Runtime {
tokio::runtime::Builder::new()
.basic_scheduler()
Expand Down

0 comments on commit def8678

Please sign in to comment.