From 1eb468be080fb955d66b953ade24289b915c4abd Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Thu, 22 Jul 2021 12:05:02 +0200 Subject: [PATCH] task: fix leak in LocalSet (#3978) --- tokio/src/runtime/tests/loom_local.rs | 47 +++++++++++++++++++++++++++ tokio/src/runtime/tests/mod.rs | 1 + tokio/src/task/local.rs | 33 +++++++++++++++---- 3 files changed, 74 insertions(+), 7 deletions(-) create mode 100644 tokio/src/runtime/tests/loom_local.rs diff --git a/tokio/src/runtime/tests/loom_local.rs b/tokio/src/runtime/tests/loom_local.rs new file mode 100644 index 00000000000..d9a07a45f05 --- /dev/null +++ b/tokio/src/runtime/tests/loom_local.rs @@ -0,0 +1,47 @@ +use crate::runtime::tests::loom_oneshot as oneshot; +use crate::runtime::Builder; +use crate::task::LocalSet; + +use std::task::Poll; + +/// Waking a runtime will attempt to push a task into a queue of notifications +/// in the runtime, however the tasks in such a queue usually have a reference +/// to the runtime itself. This means that if they are not properly removed at +/// runtime shutdown, this will cause a memory leak. +/// +/// This test verifies that waking something during shutdown of a LocalSet does +/// not result in tasks lingering in the queue once shutdown is complete. This +/// is verified using loom's leak finder. +#[test] +fn wake_during_shutdown() { + loom::model(|| { + let rt = Builder::new_current_thread().build().unwrap(); + let ls = LocalSet::new(); + + let (send, recv) = oneshot::channel(); + + ls.spawn_local(async move { + let mut send = Some(send); + + let () = futures::future::poll_fn(|cx| { + if let Some(send) = send.take() { + send.send(cx.waker().clone()); + } + + Poll::Pending + }) + .await; + }); + + let handle = loom::thread::spawn(move || { + let waker = recv.recv(); + waker.wake(); + }); + + ls.block_on(&rt, crate::task::yield_now()); + + drop(ls); + handle.join().unwrap(); + drop(rt); + }); +} diff --git a/tokio/src/runtime/tests/mod.rs b/tokio/src/runtime/tests/mod.rs index ad7cf20272d..3f2cc9825e8 100644 --- a/tokio/src/runtime/tests/mod.rs +++ b/tokio/src/runtime/tests/mod.rs @@ -21,6 +21,7 @@ mod joinable_wrapper { cfg_loom! { mod loom_basic_scheduler; + mod loom_local; mod loom_blocking; mod loom_oneshot; mod loom_pool; diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 349624abe3b..37c2c508ad3 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -242,7 +242,7 @@ struct Tasks { /// LocalSet state shared between threads. struct Shared { /// Remote run queue sender - queue: Mutex>>>, + queue: Mutex>>>>, /// Wake the `LocalSet` task waker: AtomicWaker, @@ -338,7 +338,7 @@ impl LocalSet { queue: VecDeque::with_capacity(INITIAL_CAPACITY), }), shared: Arc::new(Shared { - queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)), + queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))), waker: AtomicWaker::new(), }), }, @@ -538,7 +538,8 @@ impl LocalSet { .shared .queue .lock() - .pop_front() + .as_mut() + .and_then(|queue| queue.pop_front()) .or_else(|| self.context.tasks.borrow_mut().queue.pop_front()) } else { self.context @@ -546,7 +547,14 @@ impl LocalSet { .borrow_mut() .queue .pop_front() - .or_else(|| self.context.shared.queue.lock().pop_front()) + .or_else(|| { + self.context + .shared + .queue + .lock() + .as_mut() + .and_then(|queue| queue.pop_front()) + }) } } @@ -610,7 +618,10 @@ impl Drop for LocalSet { task.shutdown(); } - for task in self.context.shared.queue.lock().drain(..) { + // Take the queue from the Shared object to prevent pushing + // notifications to it in the future. + let queue = self.context.shared.queue.lock().take().unwrap(); + for task in queue { task.shutdown(); } @@ -660,8 +671,16 @@ impl Shared { cx.tasks.borrow_mut().queue.push_back(task); } _ => { - self.queue.lock().push_back(task); - self.waker.wake(); + // First check whether the queue is still there (if not, the + // LocalSet is dropped). Then push to it if so, and if not, + // do nothing. + let mut lock = self.queue.lock(); + + if let Some(queue) = lock.as_mut() { + queue.push_back(task); + drop(lock); + self.waker.wake(); + } } }); }