From d85ea7425e6c256759c41373a988fd774f028775 Mon Sep 17 00:00:00 2001 From: David Pedersen Date: Sat, 13 Mar 2021 16:40:46 +0100 Subject: [PATCH] Start over with new requirements --- tokio/src/io/driver/mod.rs | 25 ++ tokio/src/io/driver/registration.rs | 11 +- tokio/src/runtime/handle.rs | 12 +- tokio/src/runtime/mod.rs | 2 +- tokio/src/time/driver/entry.rs | 4 + tokio/src/time/driver/handle.rs | 19 +- tokio/src/time/driver/mod.rs | 71 ++-- tokio/src/time/driver/tests/mod.rs | 14 +- tokio/src/util/error.rs | 4 + tokio/tests/rt_handle_block_on.rs | 491 ++++++++++++++++++++++++++++ 10 files changed, 607 insertions(+), 46 deletions(-) create mode 100644 tokio/tests/rt_handle_block_on.rs diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index fa2d4208c72..93f647d4273 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -14,6 +14,7 @@ pub(crate) use registration::Registration; mod scheduled_io; use scheduled_io::ScheduledIo; +use crate::loom::sync::atomic::{AtomicBool, Ordering}; use crate::park::{Park, Unpark}; use crate::util::slab::{self, Slab}; use crate::{loom::sync::Mutex, util::bit}; @@ -73,6 +74,9 @@ pub(super) struct Inner { /// Used to wake up the reactor from a call to `turn` waker: mio::Waker, + + /// Whether the driver is shutdown. + is_shutdown: AtomicBool, } #[derive(Debug, Eq, PartialEq, Clone, Copy)] @@ -129,6 +133,7 @@ impl Driver { registry, io_dispatch: allocator, waker, + is_shutdown: AtomicBool::new(false), }), }) } @@ -208,6 +213,8 @@ impl Drop for Driver { impl Drop for Inner { fn drop(&mut self) { + self.is_shutdown.store(true, Ordering::SeqCst); + let resources = self.resources.lock().take(); if let Some(mut slab) = resources { @@ -297,6 +304,24 @@ impl Handle { pub(super) fn inner(&self) -> Option> { self.inner.upgrade() } + + pub(crate) fn shutdown(self) { + if let Some(inner) = self.inner.upgrade() { + inner + .is_shutdown + .store(true, crate::loom::sync::atomic::Ordering::SeqCst); + } + } + + pub(crate) fn is_shutdown(&self) -> bool { + if let Some(inner) = self.inner.upgrade() { + inner.is_shutdown.load(Ordering::SeqCst) + } else { + // if the inner type has been dropped then its `Drop` impl will have been called which + // sets `Inner.is_shutdown` to `true`. So therefore it must have been shutdown. + true + } + } } impl Unpark for Handle { diff --git a/tokio/src/io/driver/registration.rs b/tokio/src/io/driver/registration.rs index 1451224598c..66ffb2f25ed 100644 --- a/tokio/src/io/driver/registration.rs +++ b/tokio/src/io/driver/registration.rs @@ -54,7 +54,7 @@ unsafe impl Sync for Registration {} impl Registration { /// Registers the I/O resource with the default reactor, for a specific - /// `Interest`. `new_with_interest` should be used over `new` when you need + /// `Interest`. `new_with_interest` should be ucrate::util::error::RUNTIME_SHUTTING_DOWN_ERRORsed over `new` when you need /// control over the readiness state, such as when a file descriptor only /// allows reads. This does not add `hup` or `error` so if you are /// interested in those states, you will need to add them to the readiness @@ -69,6 +69,13 @@ impl Registration { interest: Interest, handle: Handle, ) -> io::Result { + if handle.is_shutdown() { + return Err(io::Error::new( + io::ErrorKind::Other, + crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR, + )); + } + let shared = if let Some(inner) = handle.inner() { inner.add_source(io, interest)? } else { @@ -232,7 +239,7 @@ cfg_io_readiness! { pin!(fut); crate::future::poll_fn(|cx| { - if self.handle.inner().is_none() { + if self.handle.inner().is_none() || self.handle.is_shutdown() { return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "reactor gone"))); } diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 11f921d0354..066fff51653 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -211,7 +211,17 @@ impl Handle { let mut _blocking_enter = crate::runtime::enter(true); // Block on the future - _blocking_enter.block_on(future).expect("failed to park thread") + _blocking_enter + .block_on(future) + .expect("failed to park thread") + } + + pub(crate) fn shutdown(mut self) { + self.spawner.shutdown(); + + if let Some(io_handle) = self.io_handle { + io_handle.shutdown(); + } } } diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index b138a66455c..2075d487720 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -526,7 +526,7 @@ cfg_rt! { /// ``` pub fn shutdown_timeout(mut self, duration: Duration) { // Wakeup and shutdown all the worker threads - self.handle.spawner.shutdown(); + self.handle.shutdown(); self.blocking_pool.shutdown(Some(duration)); } diff --git a/tokio/src/time/driver/entry.rs b/tokio/src/time/driver/entry.rs index 11366d2ada3..e840956c146 100644 --- a/tokio/src/time/driver/entry.rs +++ b/tokio/src/time/driver/entry.rs @@ -543,6 +543,10 @@ impl TimerEntry { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { + if self.driver.is_shutdown() { + panic!(crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR); + } + if let Some(deadline) = self.initial_deadline { self.as_mut().reset(deadline); } diff --git a/tokio/src/time/driver/handle.rs b/tokio/src/time/driver/handle.rs index e934b562be4..00f648e6f20 100644 --- a/tokio/src/time/driver/handle.rs +++ b/tokio/src/time/driver/handle.rs @@ -1,4 +1,4 @@ -use crate::loom::sync::{Arc, Mutex}; +use crate::loom::sync::Arc; use crate::time::driver::ClockTime; use std::fmt; @@ -6,13 +6,13 @@ use std::fmt; #[derive(Clone)] pub(crate) struct Handle { time_source: ClockTime, - inner: Arc>, + inner: Arc, } impl Handle { /// Creates a new timer `Handle` from a shared `Inner` timer state. - pub(super) fn new(inner: Arc>) -> Self { - let time_source = inner.lock().time_source.clone(); + pub(super) fn new(inner: Arc) -> Self { + let time_source = inner.state.lock().time_source.clone(); Handle { time_source, inner } } @@ -21,9 +21,14 @@ impl Handle { &self.time_source } - /// Locks the driver's inner structure - pub(super) fn lock(&self) -> crate::loom::sync::MutexGuard<'_, super::Inner> { - self.inner.lock() + /// Access the driver's inner structure + pub(super) fn get(&self) -> &super::Inner { + &*self.inner + } + + // Check whether the driver has been shutdown + pub(super) fn is_shutdown(&self) -> bool { + self.inner.is_shutdown() } } diff --git a/tokio/src/time/driver/mod.rs b/tokio/src/time/driver/mod.rs index 615307ea572..3eb100412fb 100644 --- a/tokio/src/time/driver/mod.rs +++ b/tokio/src/time/driver/mod.rs @@ -16,6 +16,7 @@ mod wheel; pub(super) mod sleep; +use crate::loom::sync::atomic::{AtomicBool, Ordering}; use crate::loom::sync::{Arc, Mutex}; use crate::park::{Park, Unpark}; use crate::time::error::Error; @@ -86,7 +87,7 @@ pub(crate) struct Driver { time_source: ClockTime, /// Shared state - inner: Handle, + handle: Handle, /// Parker to delegate to park: P, @@ -132,7 +133,16 @@ impl ClockTime { } /// Timer state shared between `Driver`, `Handle`, and `Registration`. -pub(self) struct Inner { +struct Inner { + // The state is split like this so `Handle` can access `is_shutdown` without locking the mutex + pub(super) state: Mutex, + + /// True if the driver is being shutdown + pub(super) is_shutdown: AtomicBool, +} + +/// Time state shared which must be protected by a `Mutex` +struct InnerState { /// Timing backend in use time_source: ClockTime, @@ -145,9 +155,6 @@ pub(self) struct Inner { /// Timer wheel wheel: wheel::Wheel, - /// True if the driver is being shutdown - is_shutdown: bool, - /// Unparker that can be used to wake the time driver unpark: Box, } @@ -169,7 +176,7 @@ where Driver { time_source, - inner: Handle::new(Arc::new(Mutex::new(inner))), + handle: Handle::new(Arc::new(inner)), park, } } @@ -181,15 +188,15 @@ where /// `with_default`, setting the timer as the default timer for the execution /// context. pub(crate) fn handle(&self) -> Handle { - self.inner.clone() + self.handle.clone() } fn park_internal(&mut self, limit: Option) -> Result<(), P::Error> { let clock = &self.time_source.clock; - let mut lock = self.inner.lock(); + let mut lock = self.handle.get().state.lock(); - assert!(!lock.is_shutdown); + assert!(!self.handle.is_shutdown()); let next_wake = lock.wheel.next_expiration_time(); lock.next_wake = @@ -237,7 +244,7 @@ where } // Process pending timers after waking up - self.inner.process(); + self.handle.process(); Ok(()) } @@ -255,7 +262,7 @@ impl Handle { let mut waker_list: [Option; 32] = Default::default(); let mut waker_idx = 0; - let mut lock = self.lock(); + let mut lock = self.get().lock(); assert!(now >= lock.elapsed); @@ -278,7 +285,7 @@ impl Handle { waker_idx = 0; - lock = self.lock(); + lock = self.get().lock(); } } } @@ -309,7 +316,7 @@ impl Handle { /// `add_entry` must not be called concurrently. pub(self) unsafe fn clear_entry(&self, entry: NonNull) { unsafe { - let mut lock = self.lock(); + let mut lock = self.get().lock(); if entry.as_ref().might_be_registered() { lock.wheel.remove(entry); @@ -327,7 +334,7 @@ impl Handle { /// the `TimerEntry`) pub(self) unsafe fn reregister(&self, new_tick: u64, entry: NonNull) { let waker = unsafe { - let mut lock = self.lock(); + let mut lock = self.get().lock(); // We may have raced with a firing/deregistration, so check before // deregistering. @@ -338,7 +345,7 @@ impl Handle { // Now that we have exclusive control of this entry, mint a handle to reinsert it. let entry = entry.as_ref().handle(); - if lock.is_shutdown { + if self.is_shutdown() { unsafe { entry.fire(Err(crate::time::error::Error::shutdown())) } } else { entry.set_expiration(new_tick); @@ -396,19 +403,15 @@ where } fn shutdown(&mut self) { - let mut lock = self.inner.lock(); - - if lock.is_shutdown { + if self.handle.is_shutdown() { return; } - lock.is_shutdown = true; - - drop(lock); + self.handle.get().is_shutdown.store(true, Ordering::SeqCst); // Advance time forward to the end of time. - self.inner.process_at_time(u64::MAX); + self.handle.process_at_time(u64::MAX); self.park.shutdown(); } @@ -428,14 +431,26 @@ where impl Inner { pub(self) fn new(time_source: ClockTime, unpark: Box) -> Self { Inner { - time_source, - elapsed: 0, - next_wake: None, - unpark, - wheel: wheel::Wheel::new(), - is_shutdown: false, + state: Mutex::new(InnerState { + time_source, + elapsed: 0, + next_wake: None, + unpark, + wheel: wheel::Wheel::new(), + }), + is_shutdown: AtomicBool::new(false), } } + + /// Locks the driver's inner structure + pub(super) fn lock(&self) -> crate::loom::sync::MutexGuard<'_, InnerState> { + self.state.lock() + } + + // Check whether the driver has been shutdown + pub(super) fn is_shutdown(&self) -> bool { + self.is_shutdown.load(Ordering::SeqCst) + } } impl fmt::Debug for Inner { diff --git a/tokio/src/time/driver/tests/mod.rs b/tokio/src/time/driver/tests/mod.rs index 8ae4a84b442..7c5cf1fd05c 100644 --- a/tokio/src/time/driver/tests/mod.rs +++ b/tokio/src/time/driver/tests/mod.rs @@ -3,7 +3,7 @@ use std::{task::Context, time::Duration}; #[cfg(not(loom))] use futures::task::noop_waker_ref; -use crate::loom::sync::{Arc, Mutex}; +use crate::loom::sync::Arc; use crate::loom::thread; use crate::{ loom::sync::atomic::{AtomicBool, Ordering}, @@ -45,7 +45,7 @@ fn single_timer() { let time_source = super::ClockTime::new(clock.clone()); let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); - let handle = Handle::new(Arc::new(Mutex::new(inner))); + let handle = Handle::new(Arc::new(inner)); let handle_ = handle.clone(); let jh = thread::spawn(move || { @@ -76,7 +76,7 @@ fn drop_timer() { let time_source = super::ClockTime::new(clock.clone()); let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); - let handle = Handle::new(Arc::new(Mutex::new(inner))); + let handle = Handle::new(Arc::new(inner)); let handle_ = handle.clone(); let jh = thread::spawn(move || { @@ -107,7 +107,7 @@ fn change_waker() { let time_source = super::ClockTime::new(clock.clone()); let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); - let handle = Handle::new(Arc::new(Mutex::new(inner))); + let handle = Handle::new(Arc::new(inner)); let handle_ = handle.clone(); let jh = thread::spawn(move || { @@ -142,7 +142,7 @@ fn reset_future() { let time_source = super::ClockTime::new(clock.clone()); let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); - let handle = Handle::new(Arc::new(Mutex::new(inner))); + let handle = Handle::new(Arc::new(inner)); let handle_ = handle.clone(); let finished_early_ = finished_early.clone(); @@ -191,7 +191,7 @@ fn poll_process_levels() { let time_source = super::ClockTime::new(clock.clone()); let inner = super::Inner::new(time_source, MockUnpark::mock()); - let handle = Handle::new(Arc::new(Mutex::new(inner))); + let handle = Handle::new(Arc::new(inner)); let mut entries = vec![]; @@ -232,7 +232,7 @@ fn poll_process_levels_targeted() { let time_source = super::ClockTime::new(clock.clone()); let inner = super::Inner::new(time_source, MockUnpark::mock()); - let handle = Handle::new(Arc::new(Mutex::new(inner))); + let handle = Handle::new(Arc::new(inner)); let e1 = TimerEntry::new(&handle, clock.now() + Duration::from_millis(193)); pin!(e1); diff --git a/tokio/src/util/error.rs b/tokio/src/util/error.rs index 518cb2c0aed..83375a98991 100644 --- a/tokio/src/util/error.rs +++ b/tokio/src/util/error.rs @@ -1,3 +1,7 @@ /// Error string explaining that the Tokio context hasn't been instantiated. pub(crate) const CONTEXT_MISSING_ERROR: &str = "there is no reactor running, must be called from the context of a Tokio 1.x runtime"; + +/// Error string explaining that the Tokio context is shutting down and cannot drive timers. +pub(crate) const RUNTIME_SHUTTING_DOWN_ERROR: &str = + "A Tokio 1.x context was found, but it is being shutdown."; diff --git a/tokio/tests/rt_handle_block_on.rs b/tokio/tests/rt_handle_block_on.rs new file mode 100644 index 00000000000..ed2ceab0075 --- /dev/null +++ b/tokio/tests/rt_handle_block_on.rs @@ -0,0 +1,491 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use std::time::Duration; +use tempfile::tempdir; +use tokio::runtime::{Handle, Runtime}; +use tokio::sync::mpsc; +use tokio::task::spawn_blocking; +use tokio::{fs, net, time}; + +macro_rules! multi_threaded_rt_test { + ($($t:tt)*) => { + mod threaded_scheduler_4_threads_only { + use super::*; + + $($t)* + + fn rt() -> Runtime { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(4) + .enable_all() + .build() + .unwrap() + } + } + + mod threaded_scheduler_1_thread_only { + use super::*; + + $($t)* + + fn rt() -> Runtime { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .unwrap() + } + } + } +} + +macro_rules! rt_test { + ($($t:tt)*) => { + mod current_thread_scheduler { + use super::*; + + $($t)* + + fn rt() -> Runtime { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + } + } + + mod threaded_scheduler_4_threads { + use super::*; + + $($t)* + + fn rt() -> Runtime { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(4) + .enable_all() + .build() + .unwrap() + } + } + + mod threaded_scheduler_1_thread { + use super::*; + + $($t)* + + fn rt() -> Runtime { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .unwrap() + } + } + } +} + +// ==== runtime independent futures ====== + +#[test] +fn basic() { + test_with_runtimes(|| { + let one = Handle::current().block_on(async { 1 }); + assert_eq!(1, one); + }); +} + +#[test] +fn bounded_mpsc_channel() { + test_with_runtimes(|| { + let (tx, mut rx) = mpsc::channel(1024); + + Handle::current().block_on(tx.send(42)).unwrap(); + + let value = Handle::current().block_on(rx.recv()).unwrap(); + assert_eq!(value, 42); + }); +} + +#[test] +fn unbounded_mpsc_channel() { + test_with_runtimes(|| { + let (tx, mut rx) = mpsc::unbounded_channel(); + + let _ = tx.send(42); + + let value = Handle::current().block_on(rx.recv()).unwrap(); + assert_eq!(value, 42); + }) +} + +rt_test! { + // ==== spawn blocking futures ====== + + #[test] + fn basic_fs() { + let rt = rt(); + let _enter = rt.enter(); + + let contents = Handle::current() + .block_on(fs::read_to_string("Cargo.toml")) + .unwrap(); + assert!(contents.contains("Cargo.toml")); + } + + #[test] + fn fs_shutdown_before_started() { + let rt = rt(); + let _enter = rt.enter(); + rt.shutdown_timeout(Duration::from_secs(1000)); + + let err: std::io::Error = Handle::current() + .block_on(fs::read_to_string("Cargo.toml")) + .unwrap_err(); + + assert_eq!(err.kind(), std::io::ErrorKind::Other); + + let inner_err = err.get_ref().expect("no inner error"); + assert_eq!(inner_err.to_string(), "background task failed"); + } + + #[test] + fn basic_spawn_blocking() { + let rt = rt(); + let _enter = rt.enter(); + + let answer = Handle::current() + .block_on(spawn_blocking(|| { + std::thread::sleep(Duration::from_millis(100)); + 42 + })) + .unwrap(); + + assert_eq!(answer, 42); + } + + #[test] + fn spawn_blocking_after_shutdown_fails() { + let rt = rt(); + let _enter = rt.enter(); + rt.shutdown_timeout(Duration::from_secs(1000)); + + let join_err = Handle::current() + .block_on(spawn_blocking(|| { + std::thread::sleep(Duration::from_millis(100)); + 42 + })) + .unwrap_err(); + + assert!(join_err.is_cancelled()); + } + + #[test] + fn spawn_blocking_started_before_shutdown_continues() { + let rt = rt(); + let _enter = rt.enter(); + + let handle = spawn_blocking(|| { + std::thread::sleep(Duration::from_secs(1)); + 42 + }); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + let answer = Handle::current().block_on(handle).unwrap(); + + assert_eq!(answer, 42); + } + + // ==== net ====== + + #[test] + fn tcp_listener_bind() { + let rt = rt(); + let _enter = rt.enter(); + + Handle::current() + .block_on(net::TcpListener::bind("127.0.0.1:0")) + .unwrap(); + } + + #[test] + fn tcp_listener_connect_after_shutdown() { + let rt = rt(); + let _enter = rt.enter(); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + let err = Handle::current() + .block_on(net::TcpListener::bind("127.0.0.1:0")) + .unwrap_err(); + + assert_eq!(err.kind(), std::io::ErrorKind::Other); + assert_eq!( + err.get_ref().unwrap().to_string(), + "A Tokio 1.x context was found, but it is being shutdown.", + ); + } + + #[test] + fn tcp_listener_connect_before_shutdown() { + let rt = rt(); + let _enter = rt.enter(); + + let bind_future = net::TcpListener::bind("127.0.0.1:0"); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + let err = Handle::current().block_on(bind_future).unwrap_err(); + + assert_eq!(err.kind(), std::io::ErrorKind::Other); + assert_eq!( + err.get_ref().unwrap().to_string(), + "A Tokio 1.x context was found, but it is being shutdown.", + ); + } + + #[test] + fn udp_socket_bind() { + let rt = rt(); + let _enter = rt.enter(); + + Handle::current() + .block_on(net::UdpSocket::bind("127.0.0.1:0")) + .unwrap(); + } + + #[test] + fn udp_stream_bind_after_shutdown() { + let rt = rt(); + let _enter = rt.enter(); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + let err = Handle::current() + .block_on(net::UdpSocket::bind("127.0.0.1:0")) + .unwrap_err(); + + assert_eq!(err.kind(), std::io::ErrorKind::Other); + assert_eq!( + err.get_ref().unwrap().to_string(), + "A Tokio 1.x context was found, but it is being shutdown.", + ); + } + + #[test] + fn udp_stream_bind_before_shutdown() { + let rt = rt(); + let _enter = rt.enter(); + + let bind_future = net::UdpSocket::bind("127.0.0.1:0"); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + let err = Handle::current().block_on(bind_future).unwrap_err(); + + assert_eq!(err.kind(), std::io::ErrorKind::Other); + assert_eq!( + err.get_ref().unwrap().to_string(), + "A Tokio 1.x context was found, but it is being shutdown.", + ); + } + + #[cfg(unix)] + #[test] + fn unix_listener_bind_after_shutdown() { + let rt = rt(); + let _enter = rt.enter(); + + let dir = tempdir().unwrap(); + let path = dir.path().join("socket"); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + let err = net::UnixListener::bind(path).unwrap_err(); + + assert_eq!(err.kind(), std::io::ErrorKind::Other); + assert_eq!( + err.get_ref().unwrap().to_string(), + "A Tokio 1.x context was found, but it is being shutdown.", + ); + } + + #[cfg(unix)] + #[test] + fn unix_listener_bind_accept_after_shutdown_1() { + let rt = rt(); + let _enter = rt.enter(); + + let dir = tempdir().unwrap(); + let path = dir.path().join("socket"); + + let listener = net::UnixListener::bind(path).unwrap(); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + // this should not timeout but fail immediately since the runtime has been shutdown + let err = Handle::current().block_on(listener.accept()).unwrap_err(); + + assert_eq!(err.kind(), std::io::ErrorKind::Other); + assert_eq!(err.get_ref().unwrap().to_string(), "reactor gone"); + } + + #[cfg(unix)] + #[test] + fn unix_listener_bind_accept_after_shutdown_2() { + let rt = rt(); + let _enter = rt.enter(); + + let dir = tempdir().unwrap(); + let path = dir.path().join("socket"); + + let listener = net::UnixListener::bind(path).unwrap(); + + let accept_future = listener.accept(); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + // this should not timeout but fail immediately since the runtime has been shutdown + let err = Handle::current().block_on(accept_future).unwrap_err(); + + assert_eq!(err.kind(), std::io::ErrorKind::Other); + assert_eq!(err.get_ref().unwrap().to_string(), "reactor gone"); + } + + // ==== nesting ====== + + #[test] + #[should_panic( + expected = "Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks." + )] + fn nesting() { + fn some_non_async_function() -> i32 { + Handle::current().block_on(time::sleep(Duration::from_millis(10))); + 1 + } + + let rt = rt(); + + rt.block_on(async { some_non_async_function() }); + } +} + +multi_threaded_rt_test! { + #[test] + fn unix_listener_bind() { + let rt = rt(); + let _enter = rt.enter(); + + let dir = tempdir().unwrap(); + let path = dir.path().join("socket"); + + let listener = net::UnixListener::bind(path).unwrap(); + + // this should timeout and not fail immediately since the runtime has not been shutdown + let _: tokio::time::error::Elapsed = Handle::current() + .block_on(tokio::time::timeout( + Duration::from_millis(10), + listener.accept(), + )) + .unwrap_err(); + } + + // ==== timers ====== + + // `Handle::block_on` doesn't work with timer futures on a current thread runtime as there is no + // one to drive the timers so they will just hang forever. Therefore they are not tested. + + #[test] + fn sleep() { + let rt = rt(); + let _enter = rt.enter(); + + Handle::current().block_on(time::sleep(Duration::from_millis(100))); + } + + #[test] + #[should_panic(expected = "A Tokio 1.x context was found, but it is being shutdown.")] + fn sleep_before_shutdown_panics() { + let rt = rt(); + let _enter = rt.enter(); + + let f = time::sleep(Duration::from_millis(100)); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + Handle::current().block_on(f); + } + + #[test] + #[should_panic(expected = "A Tokio 1.x context was found, but it is being shutdown.")] + fn sleep_after_shutdown_panics() { + let rt = rt(); + let _enter = rt.enter(); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + Handle::current().block_on(time::sleep(Duration::from_millis(100))); + } +} + +// ==== utils ====== + +/// Create a new multi threaded runtime +fn new_multi_thread(n: usize) -> Runtime { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(n) + .enable_all() + .build() + .unwrap() +} + +/// Create a new single threaded runtime +fn new_current_thread() -> Runtime { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() +} + +/// Utility to test things on both kinds of runtimes both before and after shutting it down. +fn test_with_runtimes(f: F) +where + F: Fn(), +{ + { + println!("current thread runtime"); + + let rt = new_current_thread(); + let _enter = rt.enter(); + f(); + + println!("current thread runtime after shutdown"); + rt.shutdown_timeout(Duration::from_secs(1000)); + f(); + } + + { + println!("multi thread (1 thread) runtime"); + + let rt = new_multi_thread(1); + let _enter = rt.enter(); + f(); + + println!("multi thread runtime after shutdown"); + rt.shutdown_timeout(Duration::from_secs(1000)); + f(); + } + + { + println!("multi thread (4 threads) runtime"); + + let rt = new_multi_thread(4); + let _enter = rt.enter(); + f(); + + println!("multi thread runtime after shutdown"); + rt.shutdown_timeout(Duration::from_secs(1000)); + f(); + } +}