Skip to content

Commit

Permalink
rt: fix potential leak during runtime shutdown (#2649)
Browse files Browse the repository at this point in the history
JoinHandle of threads created by the pool are now tracked and properly joined at
shutdown. If the thread does not return within the timeout, then it's not joined and
left to the OS for cleanup.

Also, break a cycle between wakers held by the timer and the runtime.

Fixes #2641, #2535
  • Loading branch information
emgre committed Jul 29, 2020
1 parent 1562bb3 commit 646fbae
Show file tree
Hide file tree
Showing 16 changed files with 139 additions and 24 deletions.
4 changes: 2 additions & 2 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ process = [
"winapi/winerror",
]
# Includes basic task execution capabilities
rt-core = []
rt-core = ["slab"]
rt-util = []
rt-threaded = [
"num_cpus",
Expand Down Expand Up @@ -129,7 +129,7 @@ proptest = "0.9.4"
tempfile = "3.1.0"

[target.'cfg(loom)'.dev-dependencies]
loom = { version = "0.3.4", features = ["futures", "checkpoint"] }
loom = { version = "0.3.5", features = ["futures", "checkpoint"] }

[package.metadata.docs.rs]
all-features = true
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/io/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ impl Park for Driver {
self.turn(Some(duration))?;
Ok(())
}

fn shutdown(&mut self) {}
}

impl fmt::Debug for Driver {
Expand Down
7 changes: 7 additions & 0 deletions tokio/src/park/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ where
Either::B(b) => b.park_timeout(duration).map_err(Either::B),
}
}

fn shutdown(&mut self) {
match self {
Either::A(a) => a.shutdown(),
Either::B(b) => b.shutdown(),
}
}
}

impl<A, B> Unpark for Either<A, B>
Expand Down
3 changes: 3 additions & 0 deletions tokio/src/park/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ pub(crate) trait Park {
/// an implementation detail. Refer to the documentation for the specific
/// `Park` implementation
fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error>;

/// Release all resources holded by the parker for proper leak-free shutdown
fn shutdown(&mut self);
}

/// Unblock a thread blocked by the associated `Park` instance.
Expand Down
12 changes: 12 additions & 0 deletions tokio/src/park/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ impl Park for ParkThread {
self.inner.park_timeout(duration);
Ok(())
}

fn shutdown(&mut self) {
self.inner.shutdown();
}
}

// ==== impl Inner ====
Expand Down Expand Up @@ -188,6 +192,10 @@ impl Inner {

self.condvar.notify_one()
}

fn shutdown(&self) {
self.condvar.notify_all();
}
}

impl Default for ParkThread {
Expand Down Expand Up @@ -259,6 +267,10 @@ cfg_block_on! {
self.with_current(|park_thread| park_thread.inner.park_timeout(duration))?;
Ok(())
}

fn shutdown(&mut self) {
let _ = self.with_current(|park_thread| park_thread.inner.shutdown());
}
}


Expand Down
34 changes: 28 additions & 6 deletions tokio/src/runtime/blocking/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use crate::runtime::blocking::task::BlockingTask;
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{Builder, Callback, Handle};

use slab::Slab;

use std::collections::VecDeque;
use std::fmt;
use std::time::Duration;
Expand Down Expand Up @@ -41,6 +43,7 @@ struct Inner {
/// Call before a thread stops
before_stop: Option<Callback>,

// Maximum number of threads
thread_cap: usize,
}

Expand All @@ -51,6 +54,7 @@ struct Shared {
num_notify: u32,
shutdown: bool,
shutdown_tx: Option<shutdown::Sender>,
worker_threads: Slab<thread::JoinHandle<()>>,
}

type Task = task::Notified<NoopSchedule>;
Expand Down Expand Up @@ -96,6 +100,7 @@ impl BlockingPool {
num_notify: 0,
shutdown: false,
shutdown_tx: Some(shutdown_tx),
worker_threads: Slab::new(),
}),
condvar: Condvar::new(),
thread_name: builder.thread_name.clone(),
Expand Down Expand Up @@ -126,10 +131,15 @@ impl BlockingPool {
shared.shutdown = true;
shared.shutdown_tx = None;
self.spawner.inner.condvar.notify_all();
let mut workers = std::mem::replace(&mut shared.worker_threads, Slab::new());

drop(shared);

self.shutdown_rx.wait(timeout);
if self.shutdown_rx.wait(timeout) {
for handle in workers.drain() {
let _ = handle.join();
}
}
}
}

Expand Down Expand Up @@ -187,13 +197,23 @@ impl Spawner {
};

if let Some(shutdown_tx) = shutdown_tx {
self.spawn_thread(shutdown_tx, rt);
let mut shared = self.inner.shared.lock().unwrap();
let entry = shared.worker_threads.vacant_entry();

let handle = self.spawn_thread(shutdown_tx, rt, entry.key());

entry.insert(handle);
}

Ok(())
}

fn spawn_thread(&self, shutdown_tx: shutdown::Sender, rt: &Handle) {
fn spawn_thread(
&self,
shutdown_tx: shutdown::Sender,
rt: &Handle,
worker_id: usize,
) -> thread::JoinHandle<()> {
let mut builder = thread::Builder::new().name(self.inner.thread_name.clone());

if let Some(stack_size) = self.inner.stack_size {
Expand All @@ -207,16 +227,16 @@ impl Spawner {
// Only the reference should be moved into the closure
let rt = &rt;
rt.enter(move || {
rt.blocking_spawner.inner.run();
rt.blocking_spawner.inner.run(worker_id);
drop(shutdown_tx);
})
})
.unwrap();
.unwrap()
}
}

impl Inner {
fn run(&self) {
fn run(&self, worker_id: usize) {
if let Some(f) = &self.after_start {
f()
}
Expand Down Expand Up @@ -252,6 +272,8 @@ impl Inner {
// Even if the condvar "timed out", if the pool is entering the
// shutdown phase, we want to perform the cleanup logic.
if !shared.shutdown && timeout_result.timed_out() {
shared.worker_threads.remove(worker_id);

break 'main;
}

Expand Down
11 changes: 7 additions & 4 deletions tokio/src/runtime/blocking/shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,21 @@ impl Receiver {
/// If `timeout` is `Some`, the thread is blocked for **at most** `timeout`
/// duration. If `timeout` is `None`, then the thread is blocked until the
/// shutdown signal is received.
pub(crate) fn wait(&mut self, timeout: Option<Duration>) {
///
/// If the timeout has elapsed, it returns `false`, otherwise it returns `true`.
pub(crate) fn wait(&mut self, timeout: Option<Duration>) -> bool {
use crate::runtime::enter::try_enter;

if timeout == Some(Duration::from_nanos(0)) {
return;
return true;
}

let mut e = match try_enter(false) {
Some(enter) => enter,
_ => {
if std::thread::panicking() {
// Don't panic in a panic
return;
return false;
} else {
panic!(
"Cannot drop a runtime in a context where blocking is not allowed. \
Expand All @@ -60,9 +62,10 @@ impl Receiver {
// current thread (usually, shutting down a runtime stored in a
// thread-local).
if let Some(timeout) = timeout {
let _ = e.block_on_timeout(&mut self.rx, timeout);
e.block_on_timeout(&mut self.rx, timeout).is_ok()
} else {
let _ = e.block_on(&mut self.rx);
true
}
}
}
9 changes: 4 additions & 5 deletions tokio/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,11 +542,10 @@ impl Runtime {
/// runtime.shutdown_timeout(Duration::from_millis(100));
/// }
/// ```
pub fn shutdown_timeout(self, duration: Duration) {
let Runtime {
mut blocking_pool, ..
} = self;
blocking_pool.shutdown(Some(duration));
pub fn shutdown_timeout(mut self, duration: Duration) {
// Wakeup and shutdown all the worker threads
self.handle.spawner.shutdown();
self.blocking_pool.shutdown(Some(duration));
}

/// Shutdown the runtime, without waiting for any spawned tasks to shutdown.
Expand Down
12 changes: 12 additions & 0 deletions tokio/src/runtime/park.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ impl Park for Parker {
Ok(())
}
}

fn shutdown(&mut self) {
self.inner.shutdown();
}
}

impl Unpark for Unparker {
Expand Down Expand Up @@ -242,4 +246,12 @@ impl Inner {
fn unpark_driver(&self) {
self.shared.handle.unpark();
}

fn shutdown(&self) {
if let Some(mut driver) = self.shared.driver.try_lock() {
driver.shutdown();
}

self.condvar.notify_all();
}
}
11 changes: 11 additions & 0 deletions tokio/src/runtime/spawner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@ pub(crate) enum Spawner {
ThreadPool(thread_pool::Spawner),
}

impl Spawner {
pub(crate) fn shutdown(&mut self) {
#[cfg(feature = "rt-threaded")]
{
if let Spawner::ThreadPool(spawner) = self {
spawner.shutdown();
}
}
}
}

cfg_rt_core! {
impl Spawner {
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
Expand Down
6 changes: 5 additions & 1 deletion tokio/src/runtime/thread_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl fmt::Debug for ThreadPool {

impl Drop for ThreadPool {
fn drop(&mut self) {
self.spawner.shared.close();
self.spawner.shutdown();
}
}

Expand All @@ -108,6 +108,10 @@ impl Spawner {
self.shared.schedule(task, false);
handle
}

pub(crate) fn shutdown(&mut self) {
self.shared.close();
}
}

impl fmt::Debug for Spawner {
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/runtime/thread_pool/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,8 @@ impl Core {

// Drain the queue
while self.next_local_task().is_some() {}

park.shutdown();
}

fn drain_pending_drop(&mut self, worker: &Worker) {
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/time/driver/atomic_stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl Iterator for AtomicStackEntries {
type Item = Arc<Entry>;

fn next(&mut self) -> Option<Self::Item> {
if self.ptr.is_null() {
if self.ptr.is_null() || self.ptr == SHUTDOWN {
return None;
}

Expand Down
27 changes: 23 additions & 4 deletions tokio/src/time/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ use std::{cmp, fmt};
/// [timeout]: crate::time::Timeout
/// [interval]: crate::time::Interval
#[derive(Debug)]
pub(crate) struct Driver<T> {
pub(crate) struct Driver<T: Park> {
/// Shared state
inner: Arc<Inner>,

Expand All @@ -94,6 +94,9 @@ pub(crate) struct Driver<T> {

/// Source of "now" instances
clock: Clock,

/// True if the driver is being shutdown
is_shutdown: bool,
}

/// Timer state shared between `Driver`, `Handle`, and `Registration`.
Expand Down Expand Up @@ -135,6 +138,7 @@ where
wheel: wheel::Wheel::new(),
park,
clock,
is_shutdown: false,
}
}

Expand Down Expand Up @@ -303,10 +307,12 @@ where

Ok(())
}
}

impl<T> Drop for Driver<T> {
fn drop(&mut self) {
fn shutdown(&mut self) {
if self.is_shutdown {
return;
}

use std::u64;

// Shutdown the stack of entries to process, preventing any new entries
Expand All @@ -319,6 +325,19 @@ impl<T> Drop for Driver<T> {
while let Some(entry) = self.wheel.poll(&mut poll, &mut ()) {
entry.error(Error::shutdown());
}

self.park.shutdown();

self.is_shutdown = true;
}
}

impl<T> Drop for Driver<T>
where
T: Park,
{
fn drop(&mut self) {
self.shutdown();
}
}

Expand Down

0 comments on commit 646fbae

Please sign in to comment.