Skip to content

Commit

Permalink
test for shutdown in handle::spawn and output correct JoinError if ne…
Browse files Browse the repository at this point in the history
…cessary
  • Loading branch information
b-naber committed Apr 16, 2021
1 parent 08f1b67 commit 2e65698
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 1 deletion.
6 changes: 6 additions & 0 deletions tokio/src/runtime/blocking/pool.rs
Expand Up @@ -260,6 +260,12 @@ impl Spawner {
})
.unwrap()
}

pub(crate) fn is_shutdown(&self) -> bool {
let inner = self.inner.clone();
let shared = inner.shared.lock();
shared.shutdown
}
}

impl Inner {
Expand Down
8 changes: 8 additions & 0 deletions tokio/src/runtime/handle.rs
Expand Up @@ -144,6 +144,10 @@ impl Handle {
F: Future + Send + 'static,
F::Output: Send + 'static,
{
if self.is_shutdown() {
return JoinHandle::new_dropped_runtime();
}

#[cfg(all(tokio_unstable, feature = "tracing"))]
let future = crate::util::trace::task(future, "task");
self.spawner.spawn(future)
Expand Down Expand Up @@ -289,6 +293,10 @@ impl Handle {
pub(crate) fn shutdown(mut self) {
self.spawner.shutdown();
}

pub(crate) fn is_shutdown(&self) -> bool {
self.blocking_spawner.is_shutdown()
}
}

/// Error returned by `try_current` when no Runtime has been started
Expand Down
16 changes: 16 additions & 0 deletions tokio/src/runtime/task/error.rs
Expand Up @@ -13,6 +13,7 @@ cfg_rt! {
enum Repr {
Cancelled,
Panic(Mutex<Box<dyn Any + Send + 'static>>),
DroppedRuntime,
}

impl JoinError {
Expand All @@ -28,11 +29,23 @@ impl JoinError {
}
}

pub(crate) fn dropped_runtime() -> JoinError {
JoinError {
repr: Repr::DroppedRuntime,
}
}

/// Returns true if the error was caused by the task being cancelled
pub fn is_cancelled(&self) -> bool {
matches!(&self.repr, Repr::Cancelled)
}

/// Returns true if the the runtime was dropped before the task
/// could execute.
pub fn is_dropped_runtime(&self) -> bool {
matches!(&self.repr, Repr::DroppedRuntime)
}

/// Returns true if the error was caused by the task panicking
///
/// # Examples
Expand Down Expand Up @@ -117,6 +130,7 @@ impl fmt::Display for JoinError {
match &self.repr {
Repr::Cancelled => write!(fmt, "cancelled"),
Repr::Panic(_) => write!(fmt, "panic"),
Repr::DroppedRuntime => write!(fmt, "dropped runtime"),
}
}
}
Expand All @@ -126,6 +140,7 @@ impl fmt::Debug for JoinError {
match &self.repr {
Repr::Cancelled => write!(fmt, "JoinError::Cancelled"),
Repr::Panic(_) => write!(fmt, "JoinError::Panic(...)"),
Repr::DroppedRuntime => write!(fmt, "JoinError::DroppedRuntime"),
}
}
}
Expand All @@ -139,6 +154,7 @@ impl From<JoinError> for io::Error {
match src.repr {
Repr::Cancelled => "task was cancelled",
Repr::Panic(_) => "task panicked",
Repr::DroppedRuntime => "runtime was dropped before task could be spawned",
},
)
}
Expand Down
18 changes: 18 additions & 0 deletions tokio/src/runtime/task/join.rs
Expand Up @@ -6,6 +6,10 @@ use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};

cfg_rt! {
use crate::runtime::task::error::JoinError;
}

cfg_rt! {
/// An owned permission to join on a task (await its termination).
///
Expand Down Expand Up @@ -144,6 +148,7 @@ cfg_rt! {
pub struct JoinHandle<T> {
raw: Option<RawTask>,
_p: PhantomData<T>,
runtime_dropped: bool,
}
}

Expand All @@ -155,6 +160,15 @@ impl<T> JoinHandle<T> {
JoinHandle {
raw: Some(raw),
_p: PhantomData,
runtime_dropped: false,
}
}

pub(crate) fn new_dropped_runtime() -> JoinHandle<T> {
JoinHandle {
raw: None,
_p: PhantomData,
runtime_dropped: true,
}
}

Expand Down Expand Up @@ -203,6 +217,10 @@ impl<T> Future for JoinHandle<T> {
type Output = super::Result<T>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.runtime_dropped {
return Poll::Ready(Err(JoinError::dropped_runtime()));
}

let mut ret = Poll::Pending;

// Keep track of task budget
Expand Down
23 changes: 22 additions & 1 deletion tokio/tests/rt_handle_block_on.rs
Expand Up @@ -8,7 +8,7 @@
// When this has been fixed we want to re-enable these tests.

use std::time::Duration;
use tokio::runtime::{Handle, Runtime};
use tokio::runtime::{Builder, Handle, Runtime};
use tokio::sync::mpsc;
use tokio::task::spawn_blocking;
use tokio::{fs, net, time};
Expand Down Expand Up @@ -388,6 +388,27 @@ rt_test! {

rt.block_on(async { some_non_async_function() });
}

#[test]
fn spawn_after_runtime_dropped() {
use futures::executor::block_on;

async fn foo() {
println!("hello world");
}

let rt = Builder::new_current_thread().build().unwrap();

let handle = rt.block_on(async move {
Handle::current()
});

drop(rt);

let err = block_on(handle.spawn(foo())).unwrap_err();
assert!(err.is_dropped_runtime());
}

}

multi_threaded_rt_test! {
Expand Down

0 comments on commit 2e65698

Please sign in to comment.