Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix leak in LocalSet #3978

Merged
merged 4 commits into from Jul 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
47 changes: 47 additions & 0 deletions tokio/src/runtime/tests/loom_local.rs
@@ -0,0 +1,47 @@
use crate::runtime::tests::loom_oneshot as oneshot;
use crate::runtime::Builder;
use crate::task::LocalSet;

use std::task::Poll;

/// Waking a runtime will attempt to push a task into a queue of notifications
/// in the runtime, however the tasks in such a queue usually have a reference
/// to the runtime itself. This means that if they are not properly removed at
/// runtime shutdown, this will cause a memory leak.
///
/// This test verifies that waking something during shutdown of a LocalSet does
/// not result in tasks lingering in the queue once shutdown is complete. This
/// is verified using loom's leak finder.
#[test]
fn wake_during_shutdown() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be nice to have a comment explaining what the failure modes of this test are, for future reference? not a blocker though.

loom::model(|| {
let rt = Builder::new_current_thread().build().unwrap();
let ls = LocalSet::new();

let (send, recv) = oneshot::channel();

ls.spawn_local(async move {
let mut send = Some(send);

let () = futures::future::poll_fn(|cx| {
if let Some(send) = send.take() {
send.send(cx.waker().clone());
}

Poll::Pending
})
.await;
});

let handle = loom::thread::spawn(move || {
let waker = recv.recv();
waker.wake();
});

ls.block_on(&rt, crate::task::yield_now());

drop(ls);
handle.join().unwrap();
drop(rt);
});
}
1 change: 1 addition & 0 deletions tokio/src/runtime/tests/mod.rs
Expand Up @@ -30,6 +30,7 @@ mod unowned_wrapper {

cfg_loom! {
mod loom_basic_scheduler;
mod loom_local;
mod loom_blocking;
mod loom_oneshot;
mod loom_pool;
Expand Down
33 changes: 26 additions & 7 deletions tokio/src/task/local.rs
Expand Up @@ -241,7 +241,7 @@ struct Tasks {
/// LocalSet state shared between threads.
struct Shared {
/// Remote run queue sender
queue: Mutex<VecDeque<task::Notified<Arc<Shared>>>>,
queue: Mutex<Option<VecDeque<task::Notified<Arc<Shared>>>>>,

/// Wake the `LocalSet` task
waker: AtomicWaker,
Expand Down Expand Up @@ -339,7 +339,7 @@ impl LocalSet {
queue: VecDeque::with_capacity(INITIAL_CAPACITY),
}),
shared: Arc::new(Shared {
queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))),
waker: AtomicWaker::new(),
}),
},
Expand Down Expand Up @@ -549,15 +549,23 @@ impl LocalSet {
.shared
.queue
.lock()
.pop_front()
.as_mut()
.and_then(|queue| queue.pop_front())
.or_else(|| self.context.tasks.borrow_mut().queue.pop_front())
} else {
self.context
.tasks
.borrow_mut()
.queue
.pop_front()
.or_else(|| self.context.shared.queue.lock().pop_front())
.or_else(|| {
self.context
.shared
.queue
.lock()
.as_mut()
.and_then(|queue| queue.pop_front())
Comment on lines +564 to +567
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIOLI: it seems like queue.lock().as_mut().and_then(|queue| queue.pop_front()) could be worth making a method on Shared, but 🤷‍♀️

})
}
}

Expand Down Expand Up @@ -627,7 +635,10 @@ impl Drop for LocalSet {
task.shutdown();
}

for task in self.context.shared.queue.lock().drain(..) {
// Take the queue from the Shared object to prevent pushing
// notifications to it in the future.
let queue = self.context.shared.queue.lock().take().unwrap();
for task in queue {
task.shutdown();
}

Expand Down Expand Up @@ -677,8 +688,16 @@ impl Shared {
cx.tasks.borrow_mut().queue.push_back(task);
}
_ => {
self.queue.lock().push_back(task);
self.waker.wake();
// First check whether the queue is still there (if not, the
// LocalSet is dropped). Then push to it if so, and if not,
// do nothing.
let mut lock = self.queue.lock();

if let Some(queue) = lock.as_mut() {
queue.push_back(task);
drop(lock);
self.waker.wake();
}
}
});
}
Expand Down