diff --git a/tokio/src/io/driver/registration.rs b/tokio/src/io/driver/registration.rs index db911b3e1aa..8251fe68a00 100644 --- a/tokio/src/io/driver/registration.rs +++ b/tokio/src/io/driver/registration.rs @@ -235,7 +235,10 @@ cfg_io_readiness! { crate::future::poll_fn(|cx| { if self.handle.inner().is_none() { - return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "reactor gone"))); + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::Other, + crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR + ))); } Pin::new(&mut fut).poll(cx).map(Ok) diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 5de23669b82..9f1a4589091 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -202,6 +202,93 @@ impl Handle { let _ = self.blocking_spawner.spawn(task, &self); handle } + + /// Run a future to completion on this `Handle`'s associated `Runtime`. + /// + /// This runs the given future on the runtime, blocking until it is + /// complete, and yielding its resolved result. Any tasks or timers which + /// the future spawns internally will be executed on the runtime. + /// + /// When this is used on a `current_thread` runtime, only the + /// [`Runtime::block_on`] method can drive the IO and timer drivers, but the + /// `Handle::block_on` method cannot drive them. This means that, when using + /// this method on a current_thread runtime, anything that relies on IO or + /// timers will not work unless there is another thread currently calling + /// [`Runtime::block_on`] on the same runtime. + /// + /// # If the runtime has been shut down + /// + /// If the `Handle`'s associated `Runtime` has been shut down (through + /// [`Runtime::shutdown_background`], [`Runtime::shutdown_timeout`], or by + /// dropping it) and `Handle::block_on` is used it might return an error or + /// panic. Specifically IO resources will return an error and timers will + /// panic. Runtime independent futures will run as normal. + /// + /// # Panics + /// + /// This function panics if the provided future panics, if called within an + /// asynchronous execution context, or if a timer future is executed on a + /// runtime that has been shut down. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// // Create the runtime + /// let rt = Runtime::new().unwrap(); + /// + /// // Get a handle from this runtime + /// let handle = rt.handle(); + /// + /// // Execute the future, blocking the current thread until completion + /// handle.block_on(async { + /// println!("hello"); + /// }); + /// ``` + /// + /// Or using `Handle::current`: + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main () { + /// let handle = Handle::current(); + /// std::thread::spawn(move || { + /// // Using Handle::block_on to run async code in the new thread. + /// handle.block_on(async { + /// println!("hello"); + /// }); + /// }); + /// } + /// ``` + /// + /// [`JoinError`]: struct@crate::task::JoinError + /// [`JoinHandle`]: struct@crate::task::JoinHandle + /// [`Runtime::block_on`]: fn@crate::runtime::Runtime::block_on + /// [`Runtime::shutdown_background`]: fn@crate::runtime::Runtime::shutdown_background + /// [`Runtime::shutdown_timeout`]: fn@crate::runtime::Runtime::shutdown_timeout + /// [`spawn_blocking`]: crate::task::spawn_blocking + /// [`tokio::fs`]: crate::fs + /// [`tokio::net`]: crate::net + /// [`tokio::time`]: crate::time + pub fn block_on(&self, future: F) -> F::Output { + // Enter the **runtime** context. This configures spawning, the current I/O driver, ... + let _rt_enter = self.enter(); + + // Enter a **blocking** context. This prevents blocking from a runtime. + let mut blocking_enter = crate::runtime::enter(true); + + // Block on the future + blocking_enter + .block_on(future) + .expect("failed to park thread") + } + + pub(crate) fn shutdown(mut self) { + self.spawner.shutdown(); + } } /// Error returned by `try_current` when no Runtime has been started diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index b138a66455c..2075d487720 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -526,7 +526,7 @@ cfg_rt! { /// ``` pub fn shutdown_timeout(mut self, duration: Duration) { // Wakeup and shutdown all the worker threads - self.handle.spawner.shutdown(); + self.handle.shutdown(); self.blocking_pool.shutdown(Some(duration)); } diff --git a/tokio/src/time/driver/entry.rs b/tokio/src/time/driver/entry.rs index 11366d2ada3..e840956c146 100644 --- a/tokio/src/time/driver/entry.rs +++ b/tokio/src/time/driver/entry.rs @@ -543,6 +543,10 @@ impl TimerEntry { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { + if self.driver.is_shutdown() { + panic!(crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR); + } + if let Some(deadline) = self.initial_deadline { self.as_mut().reset(deadline); } diff --git a/tokio/src/time/driver/handle.rs b/tokio/src/time/driver/handle.rs index e934b562be4..00f648e6f20 100644 --- a/tokio/src/time/driver/handle.rs +++ b/tokio/src/time/driver/handle.rs @@ -1,4 +1,4 @@ -use crate::loom::sync::{Arc, Mutex}; +use crate::loom::sync::Arc; use crate::time::driver::ClockTime; use std::fmt; @@ -6,13 +6,13 @@ use std::fmt; #[derive(Clone)] pub(crate) struct Handle { time_source: ClockTime, - inner: Arc>, + inner: Arc, } impl Handle { /// Creates a new timer `Handle` from a shared `Inner` timer state. - pub(super) fn new(inner: Arc>) -> Self { - let time_source = inner.lock().time_source.clone(); + pub(super) fn new(inner: Arc) -> Self { + let time_source = inner.state.lock().time_source.clone(); Handle { time_source, inner } } @@ -21,9 +21,14 @@ impl Handle { &self.time_source } - /// Locks the driver's inner structure - pub(super) fn lock(&self) -> crate::loom::sync::MutexGuard<'_, super::Inner> { - self.inner.lock() + /// Access the driver's inner structure + pub(super) fn get(&self) -> &super::Inner { + &*self.inner + } + + // Check whether the driver has been shutdown + pub(super) fn is_shutdown(&self) -> bool { + self.inner.is_shutdown() } } diff --git a/tokio/src/time/driver/mod.rs b/tokio/src/time/driver/mod.rs index 615307ea572..3eb100412fb 100644 --- a/tokio/src/time/driver/mod.rs +++ b/tokio/src/time/driver/mod.rs @@ -16,6 +16,7 @@ mod wheel; pub(super) mod sleep; +use crate::loom::sync::atomic::{AtomicBool, Ordering}; use crate::loom::sync::{Arc, Mutex}; use crate::park::{Park, Unpark}; use crate::time::error::Error; @@ -86,7 +87,7 @@ pub(crate) struct Driver { time_source: ClockTime, /// Shared state - inner: Handle, + handle: Handle, /// Parker to delegate to park: P, @@ -132,7 +133,16 @@ impl ClockTime { } /// Timer state shared between `Driver`, `Handle`, and `Registration`. -pub(self) struct Inner { +struct Inner { + // The state is split like this so `Handle` can access `is_shutdown` without locking the mutex + pub(super) state: Mutex, + + /// True if the driver is being shutdown + pub(super) is_shutdown: AtomicBool, +} + +/// Time state shared which must be protected by a `Mutex` +struct InnerState { /// Timing backend in use time_source: ClockTime, @@ -145,9 +155,6 @@ pub(self) struct Inner { /// Timer wheel wheel: wheel::Wheel, - /// True if the driver is being shutdown - is_shutdown: bool, - /// Unparker that can be used to wake the time driver unpark: Box, } @@ -169,7 +176,7 @@ where Driver { time_source, - inner: Handle::new(Arc::new(Mutex::new(inner))), + handle: Handle::new(Arc::new(inner)), park, } } @@ -181,15 +188,15 @@ where /// `with_default`, setting the timer as the default timer for the execution /// context. pub(crate) fn handle(&self) -> Handle { - self.inner.clone() + self.handle.clone() } fn park_internal(&mut self, limit: Option) -> Result<(), P::Error> { let clock = &self.time_source.clock; - let mut lock = self.inner.lock(); + let mut lock = self.handle.get().state.lock(); - assert!(!lock.is_shutdown); + assert!(!self.handle.is_shutdown()); let next_wake = lock.wheel.next_expiration_time(); lock.next_wake = @@ -237,7 +244,7 @@ where } // Process pending timers after waking up - self.inner.process(); + self.handle.process(); Ok(()) } @@ -255,7 +262,7 @@ impl Handle { let mut waker_list: [Option; 32] = Default::default(); let mut waker_idx = 0; - let mut lock = self.lock(); + let mut lock = self.get().lock(); assert!(now >= lock.elapsed); @@ -278,7 +285,7 @@ impl Handle { waker_idx = 0; - lock = self.lock(); + lock = self.get().lock(); } } } @@ -309,7 +316,7 @@ impl Handle { /// `add_entry` must not be called concurrently. pub(self) unsafe fn clear_entry(&self, entry: NonNull) { unsafe { - let mut lock = self.lock(); + let mut lock = self.get().lock(); if entry.as_ref().might_be_registered() { lock.wheel.remove(entry); @@ -327,7 +334,7 @@ impl Handle { /// the `TimerEntry`) pub(self) unsafe fn reregister(&self, new_tick: u64, entry: NonNull) { let waker = unsafe { - let mut lock = self.lock(); + let mut lock = self.get().lock(); // We may have raced with a firing/deregistration, so check before // deregistering. @@ -338,7 +345,7 @@ impl Handle { // Now that we have exclusive control of this entry, mint a handle to reinsert it. let entry = entry.as_ref().handle(); - if lock.is_shutdown { + if self.is_shutdown() { unsafe { entry.fire(Err(crate::time::error::Error::shutdown())) } } else { entry.set_expiration(new_tick); @@ -396,19 +403,15 @@ where } fn shutdown(&mut self) { - let mut lock = self.inner.lock(); - - if lock.is_shutdown { + if self.handle.is_shutdown() { return; } - lock.is_shutdown = true; - - drop(lock); + self.handle.get().is_shutdown.store(true, Ordering::SeqCst); // Advance time forward to the end of time. - self.inner.process_at_time(u64::MAX); + self.handle.process_at_time(u64::MAX); self.park.shutdown(); } @@ -428,14 +431,26 @@ where impl Inner { pub(self) fn new(time_source: ClockTime, unpark: Box) -> Self { Inner { - time_source, - elapsed: 0, - next_wake: None, - unpark, - wheel: wheel::Wheel::new(), - is_shutdown: false, + state: Mutex::new(InnerState { + time_source, + elapsed: 0, + next_wake: None, + unpark, + wheel: wheel::Wheel::new(), + }), + is_shutdown: AtomicBool::new(false), } } + + /// Locks the driver's inner structure + pub(super) fn lock(&self) -> crate::loom::sync::MutexGuard<'_, InnerState> { + self.state.lock() + } + + // Check whether the driver has been shutdown + pub(super) fn is_shutdown(&self) -> bool { + self.is_shutdown.load(Ordering::SeqCst) + } } impl fmt::Debug for Inner { diff --git a/tokio/src/time/driver/tests/mod.rs b/tokio/src/time/driver/tests/mod.rs index 8ae4a84b442..7c5cf1fd05c 100644 --- a/tokio/src/time/driver/tests/mod.rs +++ b/tokio/src/time/driver/tests/mod.rs @@ -3,7 +3,7 @@ use std::{task::Context, time::Duration}; #[cfg(not(loom))] use futures::task::noop_waker_ref; -use crate::loom::sync::{Arc, Mutex}; +use crate::loom::sync::Arc; use crate::loom::thread; use crate::{ loom::sync::atomic::{AtomicBool, Ordering}, @@ -45,7 +45,7 @@ fn single_timer() { let time_source = super::ClockTime::new(clock.clone()); let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); - let handle = Handle::new(Arc::new(Mutex::new(inner))); + let handle = Handle::new(Arc::new(inner)); let handle_ = handle.clone(); let jh = thread::spawn(move || { @@ -76,7 +76,7 @@ fn drop_timer() { let time_source = super::ClockTime::new(clock.clone()); let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); - let handle = Handle::new(Arc::new(Mutex::new(inner))); + let handle = Handle::new(Arc::new(inner)); let handle_ = handle.clone(); let jh = thread::spawn(move || { @@ -107,7 +107,7 @@ fn change_waker() { let time_source = super::ClockTime::new(clock.clone()); let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); - let handle = Handle::new(Arc::new(Mutex::new(inner))); + let handle = Handle::new(Arc::new(inner)); let handle_ = handle.clone(); let jh = thread::spawn(move || { @@ -142,7 +142,7 @@ fn reset_future() { let time_source = super::ClockTime::new(clock.clone()); let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); - let handle = Handle::new(Arc::new(Mutex::new(inner))); + let handle = Handle::new(Arc::new(inner)); let handle_ = handle.clone(); let finished_early_ = finished_early.clone(); @@ -191,7 +191,7 @@ fn poll_process_levels() { let time_source = super::ClockTime::new(clock.clone()); let inner = super::Inner::new(time_source, MockUnpark::mock()); - let handle = Handle::new(Arc::new(Mutex::new(inner))); + let handle = Handle::new(Arc::new(inner)); let mut entries = vec![]; @@ -232,7 +232,7 @@ fn poll_process_levels_targeted() { let time_source = super::ClockTime::new(clock.clone()); let inner = super::Inner::new(time_source, MockUnpark::mock()); - let handle = Handle::new(Arc::new(Mutex::new(inner))); + let handle = Handle::new(Arc::new(inner)); let e1 = TimerEntry::new(&handle, clock.now() + Duration::from_millis(193)); pin!(e1); diff --git a/tokio/src/util/error.rs b/tokio/src/util/error.rs index 518cb2c0aed..0e52364a704 100644 --- a/tokio/src/util/error.rs +++ b/tokio/src/util/error.rs @@ -1,3 +1,9 @@ /// Error string explaining that the Tokio context hasn't been instantiated. pub(crate) const CONTEXT_MISSING_ERROR: &str = "there is no reactor running, must be called from the context of a Tokio 1.x runtime"; + +// some combinations of features might not use this +#[allow(dead_code)] +/// Error string explaining that the Tokio context is shutting down and cannot drive timers. +pub(crate) const RUNTIME_SHUTTING_DOWN_ERROR: &str = + "A Tokio 1.x context was found, but it is being shutdown."; diff --git a/tokio/tests/rt_handle_block_on.rs b/tokio/tests/rt_handle_block_on.rs new file mode 100644 index 00000000000..5234258be11 --- /dev/null +++ b/tokio/tests/rt_handle_block_on.rs @@ -0,0 +1,511 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +// All io tests that deal with shutdown is currently ignored because there are known bugs in with +// shutting down the io driver while concurrently registering new resources. See +// https://github.com/tokio-rs/tokio/pull/3569#pullrequestreview-612703467 fo more details. +// +// When this has been fixed we want to re-enable these tests. + +use std::time::Duration; +use tokio::runtime::{Handle, Runtime}; +use tokio::sync::mpsc; +use tokio::task::spawn_blocking; +use tokio::{fs, net, time}; + +macro_rules! multi_threaded_rt_test { + ($($t:tt)*) => { + mod threaded_scheduler_4_threads_only { + use super::*; + + $($t)* + + fn rt() -> Runtime { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(4) + .enable_all() + .build() + .unwrap() + } + } + + mod threaded_scheduler_1_thread_only { + use super::*; + + $($t)* + + fn rt() -> Runtime { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .unwrap() + } + } + } +} + +macro_rules! rt_test { + ($($t:tt)*) => { + mod current_thread_scheduler { + use super::*; + + $($t)* + + fn rt() -> Runtime { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + } + } + + mod threaded_scheduler_4_threads { + use super::*; + + $($t)* + + fn rt() -> Runtime { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(4) + .enable_all() + .build() + .unwrap() + } + } + + mod threaded_scheduler_1_thread { + use super::*; + + $($t)* + + fn rt() -> Runtime { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .unwrap() + } + } + } +} + +// ==== runtime independent futures ====== + +#[test] +fn basic() { + test_with_runtimes(|| { + let one = Handle::current().block_on(async { 1 }); + assert_eq!(1, one); + }); +} + +#[test] +fn bounded_mpsc_channel() { + test_with_runtimes(|| { + let (tx, mut rx) = mpsc::channel(1024); + + Handle::current().block_on(tx.send(42)).unwrap(); + + let value = Handle::current().block_on(rx.recv()).unwrap(); + assert_eq!(value, 42); + }); +} + +#[test] +fn unbounded_mpsc_channel() { + test_with_runtimes(|| { + let (tx, mut rx) = mpsc::unbounded_channel(); + + let _ = tx.send(42); + + let value = Handle::current().block_on(rx.recv()).unwrap(); + assert_eq!(value, 42); + }) +} + +rt_test! { + // ==== spawn blocking futures ====== + + #[test] + fn basic_fs() { + let rt = rt(); + let _enter = rt.enter(); + + let contents = Handle::current() + .block_on(fs::read_to_string("Cargo.toml")) + .unwrap(); + assert!(contents.contains("Cargo.toml")); + } + + #[test] + fn fs_shutdown_before_started() { + let rt = rt(); + let _enter = rt.enter(); + rt.shutdown_timeout(Duration::from_secs(1000)); + + let err: std::io::Error = Handle::current() + .block_on(fs::read_to_string("Cargo.toml")) + .unwrap_err(); + + assert_eq!(err.kind(), std::io::ErrorKind::Other); + + let inner_err = err.get_ref().expect("no inner error"); + assert_eq!(inner_err.to_string(), "background task failed"); + } + + #[test] + fn basic_spawn_blocking() { + let rt = rt(); + let _enter = rt.enter(); + + let answer = Handle::current() + .block_on(spawn_blocking(|| { + std::thread::sleep(Duration::from_millis(100)); + 42 + })) + .unwrap(); + + assert_eq!(answer, 42); + } + + #[test] + fn spawn_blocking_after_shutdown_fails() { + let rt = rt(); + let _enter = rt.enter(); + rt.shutdown_timeout(Duration::from_secs(1000)); + + let join_err = Handle::current() + .block_on(spawn_blocking(|| { + std::thread::sleep(Duration::from_millis(100)); + 42 + })) + .unwrap_err(); + + assert!(join_err.is_cancelled()); + } + + #[test] + fn spawn_blocking_started_before_shutdown_continues() { + let rt = rt(); + let _enter = rt.enter(); + + let handle = spawn_blocking(|| { + std::thread::sleep(Duration::from_secs(1)); + 42 + }); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + let answer = Handle::current().block_on(handle).unwrap(); + + assert_eq!(answer, 42); + } + + // ==== net ====== + + #[test] + fn tcp_listener_bind() { + let rt = rt(); + let _enter = rt.enter(); + + Handle::current() + .block_on(net::TcpListener::bind("127.0.0.1:0")) + .unwrap(); + } + + // All io tests are ignored for now. See above why that is. + #[ignore] + #[test] + fn tcp_listener_connect_after_shutdown() { + let rt = rt(); + let _enter = rt.enter(); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + let err = Handle::current() + .block_on(net::TcpListener::bind("127.0.0.1:0")) + .unwrap_err(); + + assert_eq!(err.kind(), std::io::ErrorKind::Other); + assert_eq!( + err.get_ref().unwrap().to_string(), + "A Tokio 1.x context was found, but it is being shutdown.", + ); + } + + // All io tests are ignored for now. See above why that is. + #[ignore] + #[test] + fn tcp_listener_connect_before_shutdown() { + let rt = rt(); + let _enter = rt.enter(); + + let bind_future = net::TcpListener::bind("127.0.0.1:0"); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + let err = Handle::current().block_on(bind_future).unwrap_err(); + + assert_eq!(err.kind(), std::io::ErrorKind::Other); + assert_eq!( + err.get_ref().unwrap().to_string(), + "A Tokio 1.x context was found, but it is being shutdown.", + ); + } + + #[test] + fn udp_socket_bind() { + let rt = rt(); + let _enter = rt.enter(); + + Handle::current() + .block_on(net::UdpSocket::bind("127.0.0.1:0")) + .unwrap(); + } + + // All io tests are ignored for now. See above why that is. + #[ignore] + #[test] + fn udp_stream_bind_after_shutdown() { + let rt = rt(); + let _enter = rt.enter(); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + let err = Handle::current() + .block_on(net::UdpSocket::bind("127.0.0.1:0")) + .unwrap_err(); + + assert_eq!(err.kind(), std::io::ErrorKind::Other); + assert_eq!( + err.get_ref().unwrap().to_string(), + "A Tokio 1.x context was found, but it is being shutdown.", + ); + } + + // All io tests are ignored for now. See above why that is. + #[ignore] + #[test] + fn udp_stream_bind_before_shutdown() { + let rt = rt(); + let _enter = rt.enter(); + + let bind_future = net::UdpSocket::bind("127.0.0.1:0"); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + let err = Handle::current().block_on(bind_future).unwrap_err(); + + assert_eq!(err.kind(), std::io::ErrorKind::Other); + assert_eq!( + err.get_ref().unwrap().to_string(), + "A Tokio 1.x context was found, but it is being shutdown.", + ); + } + + // All io tests are ignored for now. See above why that is. + #[ignore] + #[cfg(unix)] + #[test] + fn unix_listener_bind_after_shutdown() { + let rt = rt(); + let _enter = rt.enter(); + + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("socket"); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + let err = net::UnixListener::bind(path).unwrap_err(); + + assert_eq!(err.kind(), std::io::ErrorKind::Other); + assert_eq!( + err.get_ref().unwrap().to_string(), + "A Tokio 1.x context was found, but it is being shutdown.", + ); + } + + // All io tests are ignored for now. See above why that is. + #[ignore] + #[cfg(unix)] + #[test] + fn unix_listener_shutdown_after_bind() { + let rt = rt(); + let _enter = rt.enter(); + + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("socket"); + + let listener = net::UnixListener::bind(path).unwrap(); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + // this should not timeout but fail immediately since the runtime has been shutdown + let err = Handle::current().block_on(listener.accept()).unwrap_err(); + + assert_eq!(err.kind(), std::io::ErrorKind::Other); + assert_eq!(err.get_ref().unwrap().to_string(), "reactor gone"); + } + + // All io tests are ignored for now. See above why that is. + #[ignore] + #[cfg(unix)] + #[test] + fn unix_listener_shutdown_after_accept() { + let rt = rt(); + let _enter = rt.enter(); + + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("socket"); + + let listener = net::UnixListener::bind(path).unwrap(); + + let accept_future = listener.accept(); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + // this should not timeout but fail immediately since the runtime has been shutdown + let err = Handle::current().block_on(accept_future).unwrap_err(); + + assert_eq!(err.kind(), std::io::ErrorKind::Other); + assert_eq!(err.get_ref().unwrap().to_string(), "reactor gone"); + } + + // ==== nesting ====== + + #[test] + #[should_panic( + expected = "Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks." + )] + fn nesting() { + fn some_non_async_function() -> i32 { + Handle::current().block_on(time::sleep(Duration::from_millis(10))); + 1 + } + + let rt = rt(); + + rt.block_on(async { some_non_async_function() }); + } +} + +multi_threaded_rt_test! { + #[cfg(unix)] + #[test] + fn unix_listener_bind() { + let rt = rt(); + let _enter = rt.enter(); + + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("socket"); + + let listener = net::UnixListener::bind(path).unwrap(); + + // this should timeout and not fail immediately since the runtime has not been shutdown + let _: tokio::time::error::Elapsed = Handle::current() + .block_on(tokio::time::timeout( + Duration::from_millis(10), + listener.accept(), + )) + .unwrap_err(); + } + + // ==== timers ====== + + // `Handle::block_on` doesn't work with timer futures on a current thread runtime as there is no + // one to drive the timers so they will just hang forever. Therefore they are not tested. + + #[test] + fn sleep() { + let rt = rt(); + let _enter = rt.enter(); + + Handle::current().block_on(time::sleep(Duration::from_millis(100))); + } + + #[test] + #[should_panic(expected = "A Tokio 1.x context was found, but it is being shutdown.")] + fn sleep_before_shutdown_panics() { + let rt = rt(); + let _enter = rt.enter(); + + let f = time::sleep(Duration::from_millis(100)); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + Handle::current().block_on(f); + } + + #[test] + #[should_panic(expected = "A Tokio 1.x context was found, but it is being shutdown.")] + fn sleep_after_shutdown_panics() { + let rt = rt(); + let _enter = rt.enter(); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + Handle::current().block_on(time::sleep(Duration::from_millis(100))); + } +} + +// ==== utils ====== + +/// Create a new multi threaded runtime +fn new_multi_thread(n: usize) -> Runtime { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(n) + .enable_all() + .build() + .unwrap() +} + +/// Create a new single threaded runtime +fn new_current_thread() -> Runtime { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() +} + +/// Utility to test things on both kinds of runtimes both before and after shutting it down. +fn test_with_runtimes(f: F) +where + F: Fn(), +{ + { + println!("current thread runtime"); + + let rt = new_current_thread(); + let _enter = rt.enter(); + f(); + + println!("current thread runtime after shutdown"); + rt.shutdown_timeout(Duration::from_secs(1000)); + f(); + } + + { + println!("multi thread (1 thread) runtime"); + + let rt = new_multi_thread(1); + let _enter = rt.enter(); + f(); + + println!("multi thread runtime after shutdown"); + rt.shutdown_timeout(Duration::from_secs(1000)); + f(); + } + + { + println!("multi thread (4 threads) runtime"); + + let rt = new_multi_thread(4); + let _enter = rt.enter(); + f(); + + println!("multi thread runtime after shutdown"); + rt.shutdown_timeout(Duration::from_secs(1000)); + f(); + } +}