Skip to content

Commit

Permalink
Fix tokio-rs#2535 with a shutdown method instead of weak pointers.
Browse files Browse the repository at this point in the history
  • Loading branch information
emgre committed Jul 23, 2020
1 parent 0890836 commit a018587
Show file tree
Hide file tree
Showing 10 changed files with 81 additions and 11 deletions.
6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,16 @@ members = [
"tokio-macros",
"tokio-test",
"tokio-util",
"tokio-hello-world-leak",

# Internal
"benches",
"examples",
"tests-build",
"tests-integration",
]
[patch.crates-io]
tokio = { path = "tokio" }
tokio-macros = { path = "tokio-macros" }
tokio-test = { path = "tokio-test" }
tokio-util = { path = "tokio-util" }
4 changes: 4 additions & 0 deletions tokio/src/io/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ impl Park for Driver {
self.turn(Some(duration))?;
Ok(())
}

fn shutdown(&mut self) {

}
}

impl fmt::Debug for Driver {
Expand Down
1 change: 0 additions & 1 deletion tokio/src/loom/std/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ pub(crate) mod rand {

pub(crate) mod sync {
pub(crate) use std::sync::Arc;
pub(crate) use std::sync::Weak;

// Below, make sure all the feature-influenced types are exported for
// internal use. Note however that some are not _currently_ named by
Expand Down
7 changes: 7 additions & 0 deletions tokio/src/park/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ where
Either::B(b) => b.park_timeout(duration).map_err(Either::B),
}
}

fn shutdown(&mut self) {
match self {
Either::A(a) => a.shutdown(),
Either::B(b) => b.shutdown(),
}
}
}

impl<A, B> Unpark for Either<A, B>
Expand Down
3 changes: 3 additions & 0 deletions tokio/src/park/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ pub(crate) trait Park {
/// an implementation detail. Refer to the documentation for the specific
/// `Park` implementation
fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error>;

/// Release all resources holded by the parker for proper leak-free shutdown
fn shutdown(&mut self);
}

/// Unblock a thread blocked by the associated `Park` instance.
Expand Down
12 changes: 12 additions & 0 deletions tokio/src/park/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ impl Park for ParkThread {
self.inner.park_timeout(duration);
Ok(())
}

fn shutdown(&mut self) {
self.inner.shutdown();
}
}

// ==== impl Inner ====
Expand Down Expand Up @@ -188,6 +192,10 @@ impl Inner {

self.condvar.notify_one()
}

fn shutdown(&self) {
self.condvar.notify_all();
}
}

impl Default for ParkThread {
Expand Down Expand Up @@ -259,6 +267,10 @@ cfg_block_on! {
self.with_current(|park_thread| park_thread.inner.park_timeout(duration))?;
Ok(())
}

fn shutdown(&mut self) {
let _ = self.with_current(|park_thread| park_thread.inner.shutdown());
}
}


Expand Down
22 changes: 16 additions & 6 deletions tokio/src/runtime/park.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! A combination of the various resource driver park handles.

use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::{Arc, Condvar, Mutex, Weak};
use crate::loom::sync::{Arc, Condvar, Mutex};
use crate::loom::thread;
use crate::park::{Park, Unpark};
use crate::runtime::time;
Expand All @@ -17,7 +17,7 @@ pub(crate) struct Parker {
}

pub(crate) struct Unparker {
inner: Weak<Inner>,
inner: Arc<Inner>,
}

struct Inner {
Expand Down Expand Up @@ -85,7 +85,7 @@ impl Park for Parker {

fn unpark(&self) -> Unparker {
Unparker {
inner: Arc::downgrade(&self.inner),
inner: self.inner.clone(),
}
}

Expand All @@ -104,13 +104,15 @@ impl Park for Parker {
Ok(())
}
}

fn shutdown(&mut self) {
self.inner.shutdown();
}
}

impl Unpark for Unparker {
fn unpark(&self) {
if let Some(inner) = self.inner.upgrade() {
inner.unpark();
}
self.inner.unpark();
}
}

Expand Down Expand Up @@ -244,4 +246,12 @@ impl Inner {
fn unpark_driver(&self) {
self.shared.handle.unpark();
}

fn shutdown(&self) {
if let Some(mut driver) = self.shared.driver.try_lock() {
driver.shutdown();
}

self.condvar.notify_all();
}
}
2 changes: 2 additions & 0 deletions tokio/src/runtime/thread_pool/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,8 @@ impl Core {

// Drain the queue
while self.next_local_task().is_some() {}

park.shutdown();
}

fn drain_pending_drop(&mut self, worker: &Worker) {
Expand Down
27 changes: 23 additions & 4 deletions tokio/src/time/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ use std::{cmp, fmt};
/// [timeout]: crate::time::Timeout
/// [interval]: crate::time::Interval
#[derive(Debug)]
pub(crate) struct Driver<T> {
pub(crate) struct Driver<T: Park> {
/// Shared state
inner: Arc<Inner>,

Expand All @@ -94,6 +94,9 @@ pub(crate) struct Driver<T> {

/// Source of "now" instances
clock: Clock,

/// True if the driver is being shutdown
is_shutdown: bool,
}

/// Timer state shared between `Driver`, `Handle`, and `Registration`.
Expand Down Expand Up @@ -135,6 +138,7 @@ where
wheel: wheel::Wheel::new(),
park,
clock,
is_shutdown: false,
}
}

Expand Down Expand Up @@ -303,10 +307,12 @@ where

Ok(())
}
}

impl<T> Drop for Driver<T> {
fn drop(&mut self) {
fn shutdown(&mut self) {
if self.is_shutdown {
return;
}

use std::u64;

// Shutdown the stack of entries to process, preventing any new entries
Expand All @@ -319,6 +325,19 @@ impl<T> Drop for Driver<T> {
while let Some(entry) = self.wheel.poll(&mut poll, &mut ()) {
entry.error();
}

self.park.shutdown();

self.is_shutdown = true;
}
}

impl<T> Drop for Driver<T>
where
T: Park,
{
fn drop(&mut self) {
self.shutdown();
}
}

Expand Down
8 changes: 8 additions & 0 deletions tokio/src/time/tests/test_delay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,10 @@ fn unpark_is_delayed() {
self.0.advance(ms(436));
Ok(())
}

fn shutdown(&mut self) {

}
}

impl Unpark for MockUnpark {
Expand Down Expand Up @@ -436,6 +440,10 @@ impl Park for MockPark {
self.0.advance(duration);
Ok(())
}

fn shutdown(&mut self) {

}
}

impl Unpark for MockUnpark {
Expand Down

0 comments on commit a018587

Please sign in to comment.