From 762ad9b63f24e9754aee98e0d6eccd19959bb168 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 23 Feb 2021 12:51:50 -0800 Subject: [PATCH 01/19] rt: add Handle::block_on --- tokio/src/runtime/handle.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 5de23669b82..cfaf6de9556 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -202,6 +202,18 @@ impl Handle { let _ = self.blocking_spawner.spawn(task, &self); handle } + + /// TODO: write docs if this is a good direction + pub fn block_on(&self, future: F) -> F::Output { + // Enter the **runtime** context. This configures spawning, the current I/O driver, ... + let _rt_enter = self.enter(); + + // Enter a **blocking** context. This prevents blocking from a runtime. + let mut _blocking_enter = crate::runtime::enter(true); + + // Block on the future + _blocking_enter.block_on(future).expect("failed to park thread") + } } /// Error returned by `try_current` when no Runtime has been started From 3a70a815a92119ffa62c900da25f09513a42823b Mon Sep 17 00:00:00 2001 From: David Pedersen Date: Sat, 13 Mar 2021 16:40:46 +0100 Subject: [PATCH 02/19] Start over with new requirements --- tokio/src/io/driver/mod.rs | 25 ++ tokio/src/io/driver/registration.rs | 11 +- tokio/src/runtime/handle.rs | 12 +- tokio/src/runtime/mod.rs | 2 +- tokio/src/time/driver/entry.rs | 4 + tokio/src/time/driver/handle.rs | 19 +- tokio/src/time/driver/mod.rs | 71 ++-- tokio/src/time/driver/tests/mod.rs | 14 +- tokio/src/util/error.rs | 4 + tokio/tests/rt_handle_block_on.rs | 491 ++++++++++++++++++++++++++++ 10 files changed, 607 insertions(+), 46 deletions(-) create mode 100644 tokio/tests/rt_handle_block_on.rs diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index fa2d4208c72..93f647d4273 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -14,6 +14,7 @@ pub(crate) use registration::Registration; mod scheduled_io; use scheduled_io::ScheduledIo; +use crate::loom::sync::atomic::{AtomicBool, Ordering}; use crate::park::{Park, Unpark}; use crate::util::slab::{self, Slab}; use crate::{loom::sync::Mutex, util::bit}; @@ -73,6 +74,9 @@ pub(super) struct Inner { /// Used to wake up the reactor from a call to `turn` waker: mio::Waker, + + /// Whether the driver is shutdown. + is_shutdown: AtomicBool, } #[derive(Debug, Eq, PartialEq, Clone, Copy)] @@ -129,6 +133,7 @@ impl Driver { registry, io_dispatch: allocator, waker, + is_shutdown: AtomicBool::new(false), }), }) } @@ -208,6 +213,8 @@ impl Drop for Driver { impl Drop for Inner { fn drop(&mut self) { + self.is_shutdown.store(true, Ordering::SeqCst); + let resources = self.resources.lock().take(); if let Some(mut slab) = resources { @@ -297,6 +304,24 @@ impl Handle { pub(super) fn inner(&self) -> Option> { self.inner.upgrade() } + + pub(crate) fn shutdown(self) { + if let Some(inner) = self.inner.upgrade() { + inner + .is_shutdown + .store(true, crate::loom::sync::atomic::Ordering::SeqCst); + } + } + + pub(crate) fn is_shutdown(&self) -> bool { + if let Some(inner) = self.inner.upgrade() { + inner.is_shutdown.load(Ordering::SeqCst) + } else { + // if the inner type has been dropped then its `Drop` impl will have been called which + // sets `Inner.is_shutdown` to `true`. So therefore it must have been shutdown. + true + } + } } impl Unpark for Handle { diff --git a/tokio/src/io/driver/registration.rs b/tokio/src/io/driver/registration.rs index db911b3e1aa..3f52d5f42bd 100644 --- a/tokio/src/io/driver/registration.rs +++ b/tokio/src/io/driver/registration.rs @@ -56,7 +56,7 @@ unsafe impl Sync for Registration {} impl Registration { /// Registers the I/O resource with the default reactor, for a specific - /// `Interest`. `new_with_interest` should be used over `new` when you need + /// `Interest`. `new_with_interest` should be ucrate::util::error::RUNTIME_SHUTTING_DOWN_ERRORsed over `new` when you need /// control over the readiness state, such as when a file descriptor only /// allows reads. This does not add `hup` or `error` so if you are /// interested in those states, you will need to add them to the readiness @@ -71,6 +71,13 @@ impl Registration { interest: Interest, handle: Handle, ) -> io::Result { + if handle.is_shutdown() { + return Err(io::Error::new( + io::ErrorKind::Other, + crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR, + )); + } + let shared = if let Some(inner) = handle.inner() { inner.add_source(io, interest)? } else { @@ -234,7 +241,7 @@ cfg_io_readiness! { pin!(fut); crate::future::poll_fn(|cx| { - if self.handle.inner().is_none() { + if self.handle.inner().is_none() || self.handle.is_shutdown() { return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "reactor gone"))); } diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index cfaf6de9556..ba64d9a30f2 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -212,7 +212,17 @@ impl Handle { let mut _blocking_enter = crate::runtime::enter(true); // Block on the future - _blocking_enter.block_on(future).expect("failed to park thread") + _blocking_enter + .block_on(future) + .expect("failed to park thread") + } + + pub(crate) fn shutdown(mut self) { + self.spawner.shutdown(); + + if let Some(io_handle) = self.io_handle { + io_handle.shutdown(); + } } } diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index b138a66455c..2075d487720 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -526,7 +526,7 @@ cfg_rt! { /// ``` pub fn shutdown_timeout(mut self, duration: Duration) { // Wakeup and shutdown all the worker threads - self.handle.spawner.shutdown(); + self.handle.shutdown(); self.blocking_pool.shutdown(Some(duration)); } diff --git a/tokio/src/time/driver/entry.rs b/tokio/src/time/driver/entry.rs index 11366d2ada3..e840956c146 100644 --- a/tokio/src/time/driver/entry.rs +++ b/tokio/src/time/driver/entry.rs @@ -543,6 +543,10 @@ impl TimerEntry { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { + if self.driver.is_shutdown() { + panic!(crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR); + } + if let Some(deadline) = self.initial_deadline { self.as_mut().reset(deadline); } diff --git a/tokio/src/time/driver/handle.rs b/tokio/src/time/driver/handle.rs index e934b562be4..00f648e6f20 100644 --- a/tokio/src/time/driver/handle.rs +++ b/tokio/src/time/driver/handle.rs @@ -1,4 +1,4 @@ -use crate::loom::sync::{Arc, Mutex}; +use crate::loom::sync::Arc; use crate::time::driver::ClockTime; use std::fmt; @@ -6,13 +6,13 @@ use std::fmt; #[derive(Clone)] pub(crate) struct Handle { time_source: ClockTime, - inner: Arc>, + inner: Arc, } impl Handle { /// Creates a new timer `Handle` from a shared `Inner` timer state. - pub(super) fn new(inner: Arc>) -> Self { - let time_source = inner.lock().time_source.clone(); + pub(super) fn new(inner: Arc) -> Self { + let time_source = inner.state.lock().time_source.clone(); Handle { time_source, inner } } @@ -21,9 +21,14 @@ impl Handle { &self.time_source } - /// Locks the driver's inner structure - pub(super) fn lock(&self) -> crate::loom::sync::MutexGuard<'_, super::Inner> { - self.inner.lock() + /// Access the driver's inner structure + pub(super) fn get(&self) -> &super::Inner { + &*self.inner + } + + // Check whether the driver has been shutdown + pub(super) fn is_shutdown(&self) -> bool { + self.inner.is_shutdown() } } diff --git a/tokio/src/time/driver/mod.rs b/tokio/src/time/driver/mod.rs index 615307ea572..3eb100412fb 100644 --- a/tokio/src/time/driver/mod.rs +++ b/tokio/src/time/driver/mod.rs @@ -16,6 +16,7 @@ mod wheel; pub(super) mod sleep; +use crate::loom::sync::atomic::{AtomicBool, Ordering}; use crate::loom::sync::{Arc, Mutex}; use crate::park::{Park, Unpark}; use crate::time::error::Error; @@ -86,7 +87,7 @@ pub(crate) struct Driver { time_source: ClockTime, /// Shared state - inner: Handle, + handle: Handle, /// Parker to delegate to park: P, @@ -132,7 +133,16 @@ impl ClockTime { } /// Timer state shared between `Driver`, `Handle`, and `Registration`. -pub(self) struct Inner { +struct Inner { + // The state is split like this so `Handle` can access `is_shutdown` without locking the mutex + pub(super) state: Mutex, + + /// True if the driver is being shutdown + pub(super) is_shutdown: AtomicBool, +} + +/// Time state shared which must be protected by a `Mutex` +struct InnerState { /// Timing backend in use time_source: ClockTime, @@ -145,9 +155,6 @@ pub(self) struct Inner { /// Timer wheel wheel: wheel::Wheel, - /// True if the driver is being shutdown - is_shutdown: bool, - /// Unparker that can be used to wake the time driver unpark: Box, } @@ -169,7 +176,7 @@ where Driver { time_source, - inner: Handle::new(Arc::new(Mutex::new(inner))), + handle: Handle::new(Arc::new(inner)), park, } } @@ -181,15 +188,15 @@ where /// `with_default`, setting the timer as the default timer for the execution /// context. pub(crate) fn handle(&self) -> Handle { - self.inner.clone() + self.handle.clone() } fn park_internal(&mut self, limit: Option) -> Result<(), P::Error> { let clock = &self.time_source.clock; - let mut lock = self.inner.lock(); + let mut lock = self.handle.get().state.lock(); - assert!(!lock.is_shutdown); + assert!(!self.handle.is_shutdown()); let next_wake = lock.wheel.next_expiration_time(); lock.next_wake = @@ -237,7 +244,7 @@ where } // Process pending timers after waking up - self.inner.process(); + self.handle.process(); Ok(()) } @@ -255,7 +262,7 @@ impl Handle { let mut waker_list: [Option; 32] = Default::default(); let mut waker_idx = 0; - let mut lock = self.lock(); + let mut lock = self.get().lock(); assert!(now >= lock.elapsed); @@ -278,7 +285,7 @@ impl Handle { waker_idx = 0; - lock = self.lock(); + lock = self.get().lock(); } } } @@ -309,7 +316,7 @@ impl Handle { /// `add_entry` must not be called concurrently. pub(self) unsafe fn clear_entry(&self, entry: NonNull) { unsafe { - let mut lock = self.lock(); + let mut lock = self.get().lock(); if entry.as_ref().might_be_registered() { lock.wheel.remove(entry); @@ -327,7 +334,7 @@ impl Handle { /// the `TimerEntry`) pub(self) unsafe fn reregister(&self, new_tick: u64, entry: NonNull) { let waker = unsafe { - let mut lock = self.lock(); + let mut lock = self.get().lock(); // We may have raced with a firing/deregistration, so check before // deregistering. @@ -338,7 +345,7 @@ impl Handle { // Now that we have exclusive control of this entry, mint a handle to reinsert it. let entry = entry.as_ref().handle(); - if lock.is_shutdown { + if self.is_shutdown() { unsafe { entry.fire(Err(crate::time::error::Error::shutdown())) } } else { entry.set_expiration(new_tick); @@ -396,19 +403,15 @@ where } fn shutdown(&mut self) { - let mut lock = self.inner.lock(); - - if lock.is_shutdown { + if self.handle.is_shutdown() { return; } - lock.is_shutdown = true; - - drop(lock); + self.handle.get().is_shutdown.store(true, Ordering::SeqCst); // Advance time forward to the end of time. - self.inner.process_at_time(u64::MAX); + self.handle.process_at_time(u64::MAX); self.park.shutdown(); } @@ -428,14 +431,26 @@ where impl Inner { pub(self) fn new(time_source: ClockTime, unpark: Box) -> Self { Inner { - time_source, - elapsed: 0, - next_wake: None, - unpark, - wheel: wheel::Wheel::new(), - is_shutdown: false, + state: Mutex::new(InnerState { + time_source, + elapsed: 0, + next_wake: None, + unpark, + wheel: wheel::Wheel::new(), + }), + is_shutdown: AtomicBool::new(false), } } + + /// Locks the driver's inner structure + pub(super) fn lock(&self) -> crate::loom::sync::MutexGuard<'_, InnerState> { + self.state.lock() + } + + // Check whether the driver has been shutdown + pub(super) fn is_shutdown(&self) -> bool { + self.is_shutdown.load(Ordering::SeqCst) + } } impl fmt::Debug for Inner { diff --git a/tokio/src/time/driver/tests/mod.rs b/tokio/src/time/driver/tests/mod.rs index 8ae4a84b442..7c5cf1fd05c 100644 --- a/tokio/src/time/driver/tests/mod.rs +++ b/tokio/src/time/driver/tests/mod.rs @@ -3,7 +3,7 @@ use std::{task::Context, time::Duration}; #[cfg(not(loom))] use futures::task::noop_waker_ref; -use crate::loom::sync::{Arc, Mutex}; +use crate::loom::sync::Arc; use crate::loom::thread; use crate::{ loom::sync::atomic::{AtomicBool, Ordering}, @@ -45,7 +45,7 @@ fn single_timer() { let time_source = super::ClockTime::new(clock.clone()); let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); - let handle = Handle::new(Arc::new(Mutex::new(inner))); + let handle = Handle::new(Arc::new(inner)); let handle_ = handle.clone(); let jh = thread::spawn(move || { @@ -76,7 +76,7 @@ fn drop_timer() { let time_source = super::ClockTime::new(clock.clone()); let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); - let handle = Handle::new(Arc::new(Mutex::new(inner))); + let handle = Handle::new(Arc::new(inner)); let handle_ = handle.clone(); let jh = thread::spawn(move || { @@ -107,7 +107,7 @@ fn change_waker() { let time_source = super::ClockTime::new(clock.clone()); let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); - let handle = Handle::new(Arc::new(Mutex::new(inner))); + let handle = Handle::new(Arc::new(inner)); let handle_ = handle.clone(); let jh = thread::spawn(move || { @@ -142,7 +142,7 @@ fn reset_future() { let time_source = super::ClockTime::new(clock.clone()); let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); - let handle = Handle::new(Arc::new(Mutex::new(inner))); + let handle = Handle::new(Arc::new(inner)); let handle_ = handle.clone(); let finished_early_ = finished_early.clone(); @@ -191,7 +191,7 @@ fn poll_process_levels() { let time_source = super::ClockTime::new(clock.clone()); let inner = super::Inner::new(time_source, MockUnpark::mock()); - let handle = Handle::new(Arc::new(Mutex::new(inner))); + let handle = Handle::new(Arc::new(inner)); let mut entries = vec![]; @@ -232,7 +232,7 @@ fn poll_process_levels_targeted() { let time_source = super::ClockTime::new(clock.clone()); let inner = super::Inner::new(time_source, MockUnpark::mock()); - let handle = Handle::new(Arc::new(Mutex::new(inner))); + let handle = Handle::new(Arc::new(inner)); let e1 = TimerEntry::new(&handle, clock.now() + Duration::from_millis(193)); pin!(e1); diff --git a/tokio/src/util/error.rs b/tokio/src/util/error.rs index 518cb2c0aed..83375a98991 100644 --- a/tokio/src/util/error.rs +++ b/tokio/src/util/error.rs @@ -1,3 +1,7 @@ /// Error string explaining that the Tokio context hasn't been instantiated. pub(crate) const CONTEXT_MISSING_ERROR: &str = "there is no reactor running, must be called from the context of a Tokio 1.x runtime"; + +/// Error string explaining that the Tokio context is shutting down and cannot drive timers. +pub(crate) const RUNTIME_SHUTTING_DOWN_ERROR: &str = + "A Tokio 1.x context was found, but it is being shutdown."; diff --git a/tokio/tests/rt_handle_block_on.rs b/tokio/tests/rt_handle_block_on.rs new file mode 100644 index 00000000000..ed2ceab0075 --- /dev/null +++ b/tokio/tests/rt_handle_block_on.rs @@ -0,0 +1,491 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use std::time::Duration; +use tempfile::tempdir; +use tokio::runtime::{Handle, Runtime}; +use tokio::sync::mpsc; +use tokio::task::spawn_blocking; +use tokio::{fs, net, time}; + +macro_rules! multi_threaded_rt_test { + ($($t:tt)*) => { + mod threaded_scheduler_4_threads_only { + use super::*; + + $($t)* + + fn rt() -> Runtime { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(4) + .enable_all() + .build() + .unwrap() + } + } + + mod threaded_scheduler_1_thread_only { + use super::*; + + $($t)* + + fn rt() -> Runtime { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .unwrap() + } + } + } +} + +macro_rules! rt_test { + ($($t:tt)*) => { + mod current_thread_scheduler { + use super::*; + + $($t)* + + fn rt() -> Runtime { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + } + } + + mod threaded_scheduler_4_threads { + use super::*; + + $($t)* + + fn rt() -> Runtime { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(4) + .enable_all() + .build() + .unwrap() + } + } + + mod threaded_scheduler_1_thread { + use super::*; + + $($t)* + + fn rt() -> Runtime { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .unwrap() + } + } + } +} + +// ==== runtime independent futures ====== + +#[test] +fn basic() { + test_with_runtimes(|| { + let one = Handle::current().block_on(async { 1 }); + assert_eq!(1, one); + }); +} + +#[test] +fn bounded_mpsc_channel() { + test_with_runtimes(|| { + let (tx, mut rx) = mpsc::channel(1024); + + Handle::current().block_on(tx.send(42)).unwrap(); + + let value = Handle::current().block_on(rx.recv()).unwrap(); + assert_eq!(value, 42); + }); +} + +#[test] +fn unbounded_mpsc_channel() { + test_with_runtimes(|| { + let (tx, mut rx) = mpsc::unbounded_channel(); + + let _ = tx.send(42); + + let value = Handle::current().block_on(rx.recv()).unwrap(); + assert_eq!(value, 42); + }) +} + +rt_test! { + // ==== spawn blocking futures ====== + + #[test] + fn basic_fs() { + let rt = rt(); + let _enter = rt.enter(); + + let contents = Handle::current() + .block_on(fs::read_to_string("Cargo.toml")) + .unwrap(); + assert!(contents.contains("Cargo.toml")); + } + + #[test] + fn fs_shutdown_before_started() { + let rt = rt(); + let _enter = rt.enter(); + rt.shutdown_timeout(Duration::from_secs(1000)); + + let err: std::io::Error = Handle::current() + .block_on(fs::read_to_string("Cargo.toml")) + .unwrap_err(); + + assert_eq!(err.kind(), std::io::ErrorKind::Other); + + let inner_err = err.get_ref().expect("no inner error"); + assert_eq!(inner_err.to_string(), "background task failed"); + } + + #[test] + fn basic_spawn_blocking() { + let rt = rt(); + let _enter = rt.enter(); + + let answer = Handle::current() + .block_on(spawn_blocking(|| { + std::thread::sleep(Duration::from_millis(100)); + 42 + })) + .unwrap(); + + assert_eq!(answer, 42); + } + + #[test] + fn spawn_blocking_after_shutdown_fails() { + let rt = rt(); + let _enter = rt.enter(); + rt.shutdown_timeout(Duration::from_secs(1000)); + + let join_err = Handle::current() + .block_on(spawn_blocking(|| { + std::thread::sleep(Duration::from_millis(100)); + 42 + })) + .unwrap_err(); + + assert!(join_err.is_cancelled()); + } + + #[test] + fn spawn_blocking_started_before_shutdown_continues() { + let rt = rt(); + let _enter = rt.enter(); + + let handle = spawn_blocking(|| { + std::thread::sleep(Duration::from_secs(1)); + 42 + }); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + let answer = Handle::current().block_on(handle).unwrap(); + + assert_eq!(answer, 42); + } + + // ==== net ====== + + #[test] + fn tcp_listener_bind() { + let rt = rt(); + let _enter = rt.enter(); + + Handle::current() + .block_on(net::TcpListener::bind("127.0.0.1:0")) + .unwrap(); + } + + #[test] + fn tcp_listener_connect_after_shutdown() { + let rt = rt(); + let _enter = rt.enter(); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + let err = Handle::current() + .block_on(net::TcpListener::bind("127.0.0.1:0")) + .unwrap_err(); + + assert_eq!(err.kind(), std::io::ErrorKind::Other); + assert_eq!( + err.get_ref().unwrap().to_string(), + "A Tokio 1.x context was found, but it is being shutdown.", + ); + } + + #[test] + fn tcp_listener_connect_before_shutdown() { + let rt = rt(); + let _enter = rt.enter(); + + let bind_future = net::TcpListener::bind("127.0.0.1:0"); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + let err = Handle::current().block_on(bind_future).unwrap_err(); + + assert_eq!(err.kind(), std::io::ErrorKind::Other); + assert_eq!( + err.get_ref().unwrap().to_string(), + "A Tokio 1.x context was found, but it is being shutdown.", + ); + } + + #[test] + fn udp_socket_bind() { + let rt = rt(); + let _enter = rt.enter(); + + Handle::current() + .block_on(net::UdpSocket::bind("127.0.0.1:0")) + .unwrap(); + } + + #[test] + fn udp_stream_bind_after_shutdown() { + let rt = rt(); + let _enter = rt.enter(); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + let err = Handle::current() + .block_on(net::UdpSocket::bind("127.0.0.1:0")) + .unwrap_err(); + + assert_eq!(err.kind(), std::io::ErrorKind::Other); + assert_eq!( + err.get_ref().unwrap().to_string(), + "A Tokio 1.x context was found, but it is being shutdown.", + ); + } + + #[test] + fn udp_stream_bind_before_shutdown() { + let rt = rt(); + let _enter = rt.enter(); + + let bind_future = net::UdpSocket::bind("127.0.0.1:0"); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + let err = Handle::current().block_on(bind_future).unwrap_err(); + + assert_eq!(err.kind(), std::io::ErrorKind::Other); + assert_eq!( + err.get_ref().unwrap().to_string(), + "A Tokio 1.x context was found, but it is being shutdown.", + ); + } + + #[cfg(unix)] + #[test] + fn unix_listener_bind_after_shutdown() { + let rt = rt(); + let _enter = rt.enter(); + + let dir = tempdir().unwrap(); + let path = dir.path().join("socket"); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + let err = net::UnixListener::bind(path).unwrap_err(); + + assert_eq!(err.kind(), std::io::ErrorKind::Other); + assert_eq!( + err.get_ref().unwrap().to_string(), + "A Tokio 1.x context was found, but it is being shutdown.", + ); + } + + #[cfg(unix)] + #[test] + fn unix_listener_bind_accept_after_shutdown_1() { + let rt = rt(); + let _enter = rt.enter(); + + let dir = tempdir().unwrap(); + let path = dir.path().join("socket"); + + let listener = net::UnixListener::bind(path).unwrap(); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + // this should not timeout but fail immediately since the runtime has been shutdown + let err = Handle::current().block_on(listener.accept()).unwrap_err(); + + assert_eq!(err.kind(), std::io::ErrorKind::Other); + assert_eq!(err.get_ref().unwrap().to_string(), "reactor gone"); + } + + #[cfg(unix)] + #[test] + fn unix_listener_bind_accept_after_shutdown_2() { + let rt = rt(); + let _enter = rt.enter(); + + let dir = tempdir().unwrap(); + let path = dir.path().join("socket"); + + let listener = net::UnixListener::bind(path).unwrap(); + + let accept_future = listener.accept(); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + // this should not timeout but fail immediately since the runtime has been shutdown + let err = Handle::current().block_on(accept_future).unwrap_err(); + + assert_eq!(err.kind(), std::io::ErrorKind::Other); + assert_eq!(err.get_ref().unwrap().to_string(), "reactor gone"); + } + + // ==== nesting ====== + + #[test] + #[should_panic( + expected = "Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks." + )] + fn nesting() { + fn some_non_async_function() -> i32 { + Handle::current().block_on(time::sleep(Duration::from_millis(10))); + 1 + } + + let rt = rt(); + + rt.block_on(async { some_non_async_function() }); + } +} + +multi_threaded_rt_test! { + #[test] + fn unix_listener_bind() { + let rt = rt(); + let _enter = rt.enter(); + + let dir = tempdir().unwrap(); + let path = dir.path().join("socket"); + + let listener = net::UnixListener::bind(path).unwrap(); + + // this should timeout and not fail immediately since the runtime has not been shutdown + let _: tokio::time::error::Elapsed = Handle::current() + .block_on(tokio::time::timeout( + Duration::from_millis(10), + listener.accept(), + )) + .unwrap_err(); + } + + // ==== timers ====== + + // `Handle::block_on` doesn't work with timer futures on a current thread runtime as there is no + // one to drive the timers so they will just hang forever. Therefore they are not tested. + + #[test] + fn sleep() { + let rt = rt(); + let _enter = rt.enter(); + + Handle::current().block_on(time::sleep(Duration::from_millis(100))); + } + + #[test] + #[should_panic(expected = "A Tokio 1.x context was found, but it is being shutdown.")] + fn sleep_before_shutdown_panics() { + let rt = rt(); + let _enter = rt.enter(); + + let f = time::sleep(Duration::from_millis(100)); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + Handle::current().block_on(f); + } + + #[test] + #[should_panic(expected = "A Tokio 1.x context was found, but it is being shutdown.")] + fn sleep_after_shutdown_panics() { + let rt = rt(); + let _enter = rt.enter(); + + rt.shutdown_timeout(Duration::from_secs(1000)); + + Handle::current().block_on(time::sleep(Duration::from_millis(100))); + } +} + +// ==== utils ====== + +/// Create a new multi threaded runtime +fn new_multi_thread(n: usize) -> Runtime { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(n) + .enable_all() + .build() + .unwrap() +} + +/// Create a new single threaded runtime +fn new_current_thread() -> Runtime { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() +} + +/// Utility to test things on both kinds of runtimes both before and after shutting it down. +fn test_with_runtimes(f: F) +where + F: Fn(), +{ + { + println!("current thread runtime"); + + let rt = new_current_thread(); + let _enter = rt.enter(); + f(); + + println!("current thread runtime after shutdown"); + rt.shutdown_timeout(Duration::from_secs(1000)); + f(); + } + + { + println!("multi thread (1 thread) runtime"); + + let rt = new_multi_thread(1); + let _enter = rt.enter(); + f(); + + println!("multi thread runtime after shutdown"); + rt.shutdown_timeout(Duration::from_secs(1000)); + f(); + } + + { + println!("multi thread (4 threads) runtime"); + + let rt = new_multi_thread(4); + let _enter = rt.enter(); + f(); + + println!("multi thread runtime after shutdown"); + rt.shutdown_timeout(Duration::from_secs(1000)); + f(); + } +} From b55798abd508732d90113f0d7d0fb4cc45d95e5c Mon Sep 17 00:00:00 2001 From: David Pedersen Date: Sun, 14 Mar 2021 00:07:44 +0100 Subject: [PATCH 03/19] Fix for some features not being enabled --- tokio/src/runtime/handle.rs | 16 ++++++++++++---- tokio/src/util/error.rs | 2 ++ 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index ba64d9a30f2..f56965c9a9a 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -217,11 +217,19 @@ impl Handle { .expect("failed to park thread") } - pub(crate) fn shutdown(mut self) { - self.spawner.shutdown(); + cfg_io_driver! { + pub(crate) fn shutdown(mut self) { + self.spawner.shutdown(); - if let Some(io_handle) = self.io_handle { - io_handle.shutdown(); + if let Some(io_handle) = self.io_handle { + io_handle.shutdown(); + } + } + } + + cfg_not_io_driver! { + pub(crate) fn shutdown(mut self) { + self.spawner.shutdown(); } } } diff --git a/tokio/src/util/error.rs b/tokio/src/util/error.rs index 83375a98991..0e52364a704 100644 --- a/tokio/src/util/error.rs +++ b/tokio/src/util/error.rs @@ -2,6 +2,8 @@ pub(crate) const CONTEXT_MISSING_ERROR: &str = "there is no reactor running, must be called from the context of a Tokio 1.x runtime"; +// some combinations of features might not use this +#[allow(dead_code)] /// Error string explaining that the Tokio context is shutting down and cannot drive timers. pub(crate) const RUNTIME_SHUTTING_DOWN_ERROR: &str = "A Tokio 1.x context was found, but it is being shutdown."; From c157cf13ea0c0ba49f5c8c487836badc2178358a Mon Sep 17 00:00:00 2001 From: David Pedersen Date: Sun, 14 Mar 2021 00:26:27 +0100 Subject: [PATCH 04/19] Don't test unix things on windows --- tokio/tests/rt_handle_block_on.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tokio/tests/rt_handle_block_on.rs b/tokio/tests/rt_handle_block_on.rs index ed2ceab0075..3486a684635 100644 --- a/tokio/tests/rt_handle_block_on.rs +++ b/tokio/tests/rt_handle_block_on.rs @@ -372,6 +372,7 @@ rt_test! { } multi_threaded_rt_test! { + #[cfg(unix)] #[test] fn unix_listener_bind() { let rt = rt(); From bba8cf03ebe38ebff4c3c4fdf90a58459b70f0b2 Mon Sep 17 00:00:00 2001 From: David Pedersen Date: Sun, 14 Mar 2021 00:31:22 +0100 Subject: [PATCH 05/19] Remove unused import --- tokio/tests/rt_handle_block_on.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/tokio/tests/rt_handle_block_on.rs b/tokio/tests/rt_handle_block_on.rs index 3486a684635..17e65f10a20 100644 --- a/tokio/tests/rt_handle_block_on.rs +++ b/tokio/tests/rt_handle_block_on.rs @@ -2,7 +2,6 @@ #![cfg(feature = "full")] use std::time::Duration; -use tempfile::tempdir; use tokio::runtime::{Handle, Runtime}; use tokio::sync::mpsc; use tokio::task::spawn_blocking; @@ -297,7 +296,7 @@ rt_test! { let rt = rt(); let _enter = rt.enter(); - let dir = tempdir().unwrap(); + let dir = tempfile::tempdir().unwrap(); let path = dir.path().join("socket"); rt.shutdown_timeout(Duration::from_secs(1000)); @@ -317,7 +316,7 @@ rt_test! { let rt = rt(); let _enter = rt.enter(); - let dir = tempdir().unwrap(); + let dir = tempfile::tempdir().unwrap(); let path = dir.path().join("socket"); let listener = net::UnixListener::bind(path).unwrap(); @@ -337,7 +336,7 @@ rt_test! { let rt = rt(); let _enter = rt.enter(); - let dir = tempdir().unwrap(); + let dir = tempfile::tempdir().unwrap(); let path = dir.path().join("socket"); let listener = net::UnixListener::bind(path).unwrap(); @@ -378,7 +377,7 @@ multi_threaded_rt_test! { let rt = rt(); let _enter = rt.enter(); - let dir = tempdir().unwrap(); + let dir = tempfile::tempdir().unwrap(); let path = dir.path().join("socket"); let listener = net::UnixListener::bind(path).unwrap(); From d6a99aa6ce93fecc9731d504626f8373c963a9be Mon Sep 17 00:00:00 2001 From: David Pedersen Date: Mon, 15 Mar 2021 20:06:35 +0100 Subject: [PATCH 06/19] Fix typo --- tokio/src/io/driver/registration.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/io/driver/registration.rs b/tokio/src/io/driver/registration.rs index 3f52d5f42bd..577ffa889f4 100644 --- a/tokio/src/io/driver/registration.rs +++ b/tokio/src/io/driver/registration.rs @@ -56,7 +56,7 @@ unsafe impl Sync for Registration {} impl Registration { /// Registers the I/O resource with the default reactor, for a specific - /// `Interest`. `new_with_interest` should be ucrate::util::error::RUNTIME_SHUTTING_DOWN_ERRORsed over `new` when you need + /// `Interest`. `new_with_interest` should be used over `new` when you need /// control over the readiness state, such as when a file descriptor only /// allows reads. This does not add `hup` or `error` so if you are /// interested in those states, you will need to add them to the readiness From cc592b36191a170c3c21453d312ad3b59f84e9a7 Mon Sep 17 00:00:00 2001 From: David Pedersen Date: Mon, 15 Mar 2021 22:33:55 +0100 Subject: [PATCH 07/19] Only upgrade Arc twice --- tokio/src/io/driver/mod.rs | 6 +++++- tokio/src/io/driver/registration.rs | 3 ++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index 93f647d4273..32f24f39ca6 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -315,7 +315,7 @@ impl Handle { pub(crate) fn is_shutdown(&self) -> bool { if let Some(inner) = self.inner.upgrade() { - inner.is_shutdown.load(Ordering::SeqCst) + inner.is_shutdown() } else { // if the inner type has been dropped then its `Drop` impl will have been called which // sets `Inner.is_shutdown` to `true`. So therefore it must have been shutdown. @@ -366,6 +366,10 @@ impl Inner { pub(super) fn deregister_source(&self, source: &mut impl mio::event::Source) -> io::Result<()> { self.registry.deregister(source) } + + pub(super) fn is_shutdown(&self) -> bool { + self.is_shutdown.load(Ordering::SeqCst) + } } impl Direction { diff --git a/tokio/src/io/driver/registration.rs b/tokio/src/io/driver/registration.rs index 577ffa889f4..29e76f7a257 100644 --- a/tokio/src/io/driver/registration.rs +++ b/tokio/src/io/driver/registration.rs @@ -241,7 +241,8 @@ cfg_io_readiness! { pin!(fut); crate::future::poll_fn(|cx| { - if self.handle.inner().is_none() || self.handle.is_shutdown() { + let inner = self.handle.inner(); + if inner.is_none() || inner.filter(|i| i.is_shutdown()).is_some() { return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "reactor gone"))); } From f87eac24bc8357792568df56ca1faeff1a712fbc Mon Sep 17 00:00:00 2001 From: David Pedersen Date: Tue, 16 Mar 2021 09:48:09 +0100 Subject: [PATCH 08/19] Ignore io+shutdown tests for now due to known bugs See https://github.com/tokio-rs/tokio/pull/3569#pullrequestreview-612703467 for more details --- tokio/src/io/driver/mod.rs | 29 ----------------------------- tokio/src/io/driver/registration.rs | 10 +--------- tokio/src/runtime/handle.rs | 16 ++-------------- tokio/tests/rt_handle_block_on.rs | 24 ++++++++++++++++++++++-- 4 files changed, 25 insertions(+), 54 deletions(-) diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index 32f24f39ca6..fa2d4208c72 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -14,7 +14,6 @@ pub(crate) use registration::Registration; mod scheduled_io; use scheduled_io::ScheduledIo; -use crate::loom::sync::atomic::{AtomicBool, Ordering}; use crate::park::{Park, Unpark}; use crate::util::slab::{self, Slab}; use crate::{loom::sync::Mutex, util::bit}; @@ -74,9 +73,6 @@ pub(super) struct Inner { /// Used to wake up the reactor from a call to `turn` waker: mio::Waker, - - /// Whether the driver is shutdown. - is_shutdown: AtomicBool, } #[derive(Debug, Eq, PartialEq, Clone, Copy)] @@ -133,7 +129,6 @@ impl Driver { registry, io_dispatch: allocator, waker, - is_shutdown: AtomicBool::new(false), }), }) } @@ -213,8 +208,6 @@ impl Drop for Driver { impl Drop for Inner { fn drop(&mut self) { - self.is_shutdown.store(true, Ordering::SeqCst); - let resources = self.resources.lock().take(); if let Some(mut slab) = resources { @@ -304,24 +297,6 @@ impl Handle { pub(super) fn inner(&self) -> Option> { self.inner.upgrade() } - - pub(crate) fn shutdown(self) { - if let Some(inner) = self.inner.upgrade() { - inner - .is_shutdown - .store(true, crate::loom::sync::atomic::Ordering::SeqCst); - } - } - - pub(crate) fn is_shutdown(&self) -> bool { - if let Some(inner) = self.inner.upgrade() { - inner.is_shutdown() - } else { - // if the inner type has been dropped then its `Drop` impl will have been called which - // sets `Inner.is_shutdown` to `true`. So therefore it must have been shutdown. - true - } - } } impl Unpark for Handle { @@ -366,10 +341,6 @@ impl Inner { pub(super) fn deregister_source(&self, source: &mut impl mio::event::Source) -> io::Result<()> { self.registry.deregister(source) } - - pub(super) fn is_shutdown(&self) -> bool { - self.is_shutdown.load(Ordering::SeqCst) - } } impl Direction { diff --git a/tokio/src/io/driver/registration.rs b/tokio/src/io/driver/registration.rs index 29e76f7a257..db911b3e1aa 100644 --- a/tokio/src/io/driver/registration.rs +++ b/tokio/src/io/driver/registration.rs @@ -71,13 +71,6 @@ impl Registration { interest: Interest, handle: Handle, ) -> io::Result { - if handle.is_shutdown() { - return Err(io::Error::new( - io::ErrorKind::Other, - crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR, - )); - } - let shared = if let Some(inner) = handle.inner() { inner.add_source(io, interest)? } else { @@ -241,8 +234,7 @@ cfg_io_readiness! { pin!(fut); crate::future::poll_fn(|cx| { - let inner = self.handle.inner(); - if inner.is_none() || inner.filter(|i| i.is_shutdown()).is_some() { + if self.handle.inner().is_none() { return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "reactor gone"))); } diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index f56965c9a9a..5272d55556e 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -217,20 +217,8 @@ impl Handle { .expect("failed to park thread") } - cfg_io_driver! { - pub(crate) fn shutdown(mut self) { - self.spawner.shutdown(); - - if let Some(io_handle) = self.io_handle { - io_handle.shutdown(); - } - } - } - - cfg_not_io_driver! { - pub(crate) fn shutdown(mut self) { - self.spawner.shutdown(); - } + pub(crate) fn shutdown(mut self) { + self.spawner.shutdown(); } } diff --git a/tokio/tests/rt_handle_block_on.rs b/tokio/tests/rt_handle_block_on.rs index 17e65f10a20..5234258be11 100644 --- a/tokio/tests/rt_handle_block_on.rs +++ b/tokio/tests/rt_handle_block_on.rs @@ -1,6 +1,12 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] +// All io tests that deal with shutdown is currently ignored because there are known bugs in with +// shutting down the io driver while concurrently registering new resources. See +// https://github.com/tokio-rs/tokio/pull/3569#pullrequestreview-612703467 fo more details. +// +// When this has been fixed we want to re-enable these tests. + use std::time::Duration; use tokio::runtime::{Handle, Runtime}; use tokio::sync::mpsc; @@ -208,6 +214,8 @@ rt_test! { .unwrap(); } + // All io tests are ignored for now. See above why that is. + #[ignore] #[test] fn tcp_listener_connect_after_shutdown() { let rt = rt(); @@ -226,6 +234,8 @@ rt_test! { ); } + // All io tests are ignored for now. See above why that is. + #[ignore] #[test] fn tcp_listener_connect_before_shutdown() { let rt = rt(); @@ -254,6 +264,8 @@ rt_test! { .unwrap(); } + // All io tests are ignored for now. See above why that is. + #[ignore] #[test] fn udp_stream_bind_after_shutdown() { let rt = rt(); @@ -272,6 +284,8 @@ rt_test! { ); } + // All io tests are ignored for now. See above why that is. + #[ignore] #[test] fn udp_stream_bind_before_shutdown() { let rt = rt(); @@ -290,6 +304,8 @@ rt_test! { ); } + // All io tests are ignored for now. See above why that is. + #[ignore] #[cfg(unix)] #[test] fn unix_listener_bind_after_shutdown() { @@ -310,9 +326,11 @@ rt_test! { ); } + // All io tests are ignored for now. See above why that is. + #[ignore] #[cfg(unix)] #[test] - fn unix_listener_bind_accept_after_shutdown_1() { + fn unix_listener_shutdown_after_bind() { let rt = rt(); let _enter = rt.enter(); @@ -330,9 +348,11 @@ rt_test! { assert_eq!(err.get_ref().unwrap().to_string(), "reactor gone"); } + // All io tests are ignored for now. See above why that is. + #[ignore] #[cfg(unix)] #[test] - fn unix_listener_bind_accept_after_shutdown_2() { + fn unix_listener_shutdown_after_accept() { let rt = rt(); let _enter = rt.enter(); From 6f6dca2070c03f278f93014164e51b4a6330569c Mon Sep 17 00:00:00 2001 From: David Pedersen Date: Tue, 16 Mar 2021 09:50:26 +0100 Subject: [PATCH 09/19] Improve error message when runtime is shutting down --- tokio/src/io/driver/registration.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tokio/src/io/driver/registration.rs b/tokio/src/io/driver/registration.rs index db911b3e1aa..8251fe68a00 100644 --- a/tokio/src/io/driver/registration.rs +++ b/tokio/src/io/driver/registration.rs @@ -235,7 +235,10 @@ cfg_io_readiness! { crate::future::poll_fn(|cx| { if self.handle.inner().is_none() { - return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "reactor gone"))); + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::Other, + crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR + ))); } Pin::new(&mut fut).poll(cx).map(Ok) From ea59cc5d1b5cb25adc3f4faf33635170c59aec5d Mon Sep 17 00:00:00 2001 From: David Pedersen Date: Tue, 16 Mar 2021 11:32:11 +0100 Subject: [PATCH 10/19] Initial docs for `Handle::block_on` --- tokio/src/runtime/handle.rs | 100 +++++++++++++++++++++++++++++++++++- 1 file changed, 99 insertions(+), 1 deletion(-) diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 5272d55556e..016fd85e2a9 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -203,7 +203,105 @@ impl Handle { handle } - /// TODO: write docs if this is a good direction + /// Run a future to completion on this `Handle`'s associated `Runtime`. + /// + /// This runs the given future on the runtime, blocking until it is + /// complete, and yielding its resolved result. Any tasks or timers which + /// the future spawns internally will be executed on the runtime. + /// + /// The behavior for multi threaded vs current thread schedulers is the same + /// as [`Runtime::block_on`]. See the docs of [`Runtime::block_on`] for more + /// details. + /// + /// # If the runtime has been shutdown + /// + /// If the `Handle`'s associated `Runtime` has been shutdown, through + /// [`Runtime::shutdown_background`] or [`Runtime::shutdown_timeout`], and + /// `Handle::block_on` is used it might return an error or panic. The exact + /// behavior depends on the types of futures used. + /// + /// ## Runtime independent futures + /// + /// Runtime independent futures will run as normal. They are not affected by + /// shutdown. This includes, but is not limited to, channels, signals, and + /// basic futures that don't actually `await` anything. + /// + /// ## [`spawn_blocking`] futures + /// + /// Futures created with [`spawn_blocking`] will run if they were started + /// before the runtime was shutdown. If they were created after the runtime + /// was shutdown they will get cancelled and the [`JoinHandle`] will return + /// a [`JoinError`]. + /// + /// ## File system futures + /// + /// File system futures created by something in [`tokio::fs`] behave + /// similarly to [`spawn_blocking`] futures. They will run if started before + /// shutdown but fail with an error if started after shutdown. + /// + /// ## I/O future + /// + /// I/O futures created by something in [`tokio::net`] will return an error + /// regardless if the runtime was shutdown before or after the future was + /// created. + /// + /// ## Timer futures + /// + /// Timer futures created by something in [`tokio::time`] will panic if the + /// runtime has been shutdown. This is because the function signatures don't + /// allow returning errors. + /// + /// # Panics + /// + /// This function panics if the provided future panics, if called within an + /// asynchronous execution context, or if a timer future is executed on a + /// runtime that has been shutdown. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::runtime::Runtime; + /// + /// // Create the runtime + /// let rt = Runtime::new().unwrap(); + /// + /// // Get a handle from this runtime + /// let handle = rt.handle(); + /// + /// // Execute the future, blocking the current thread until completion + /// handle.block_on(async { + /// println!("hello"); + /// }); + /// ``` + /// + /// Or using `Handle::current`: + /// + /// ```no_run + /// use tokio::runtime::Runtime; + /// + /// // Create the runtime + /// let rt = Runtime::new().unwrap(); + /// + /// // Enter the runtime context. This is necessary for `Handle::curent` to + /// // work + /// let _enter = rt.enter(); + /// + /// // Get a handle to the current runtime and execute the future, blocking + /// // the current thread until completion + /// Handle::current().block_on(async { + /// println!("hello"); + /// }); + /// ``` + /// + /// [`JoinError`]: struct@crate::task::JoinError + /// [`JoinHandle`]: struct@crate::task::JoinHandle + /// [`Runtime::block_on`]: fn@crate::runtime::Runtime::block_on + /// [`Runtime::shutdown_background`]: fn@crate::runtime::Runtime::shutdown_background + /// [`Runtime::shutdown_timeout`]: fn@crate::runtime::Runtime::shutdown_timeout + /// [`spawn_blocking`]: crate::task::spawn_blocking + /// [`tokio::fs`]: crate::fs + /// [`tokio::net`]: crate::net + /// [`tokio::time`]: crate::time pub fn block_on(&self, future: F) -> F::Output { // Enter the **runtime** context. This configures spawning, the current I/O driver, ... let _rt_enter = self.enter(); From 7f074036ca53920b920e978cfc64dff9f2a19295 Mon Sep 17 00:00:00 2001 From: David Pedersen Date: Tue, 16 Mar 2021 12:08:50 +0100 Subject: [PATCH 11/19] fix grammar --- tokio/src/runtime/handle.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 016fd85e2a9..d2b5a66eab9 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -213,9 +213,9 @@ impl Handle { /// as [`Runtime::block_on`]. See the docs of [`Runtime::block_on`] for more /// details. /// - /// # If the runtime has been shutdown + /// # If the runtime has been shut down /// - /// If the `Handle`'s associated `Runtime` has been shutdown, through + /// If the `Handle`'s associated `Runtime` has been shut down, through /// [`Runtime::shutdown_background`] or [`Runtime::shutdown_timeout`], and /// `Handle::block_on` is used it might return an error or panic. The exact /// behavior depends on the types of futures used. @@ -229,8 +229,8 @@ impl Handle { /// ## [`spawn_blocking`] futures /// /// Futures created with [`spawn_blocking`] will run if they were started - /// before the runtime was shutdown. If they were created after the runtime - /// was shutdown they will get cancelled and the [`JoinHandle`] will return + /// before the runtime was shut down. If they were created after the runtime + /// was shut down they will get cancelled and the [`JoinHandle`] will return /// a [`JoinError`]. /// /// ## File system futures @@ -242,20 +242,20 @@ impl Handle { /// ## I/O future /// /// I/O futures created by something in [`tokio::net`] will return an error - /// regardless if the runtime was shutdown before or after the future was + /// regardless if the runtime was shut down before or after the future was /// created. /// /// ## Timer futures /// /// Timer futures created by something in [`tokio::time`] will panic if the - /// runtime has been shutdown. This is because the function signatures don't + /// runtime has been shut down. This is because the function signatures don't /// allow returning errors. /// /// # Panics /// /// This function panics if the provided future panics, if called within an /// asynchronous execution context, or if a timer future is executed on a - /// runtime that has been shutdown. + /// runtime that has been shut down. /// /// # Examples /// From 9da232b311bca432a308459ed295581cef488860 Mon Sep 17 00:00:00 2001 From: David Pedersen Date: Tue, 16 Mar 2021 12:11:18 +0100 Subject: [PATCH 12/19] Tweak example --- tokio/src/runtime/handle.rs | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index d2b5a66eab9..f44698ed19d 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -277,20 +277,14 @@ impl Handle { /// Or using `Handle::current`: /// /// ```no_run - /// use tokio::runtime::Runtime; - /// - /// // Create the runtime - /// let rt = Runtime::new().unwrap(); - /// - /// // Enter the runtime context. This is necessary for `Handle::curent` to - /// // work - /// let _enter = rt.enter(); - /// - /// // Get a handle to the current runtime and execute the future, blocking - /// // the current thread until completion - /// Handle::current().block_on(async { - /// println!("hello"); - /// }); + /// #[tokio::main] + /// async fn main () { + /// // Get a handle to the current runtime and execute the future, blocking + /// // the current thread until completion + /// Handle::current().block_on(async { + /// println!("hello"); + /// }); + /// } /// ``` /// /// [`JoinError`]: struct@crate::task::JoinError From f5fbffab50ccb64882834a97645426438a56b329 Mon Sep 17 00:00:00 2001 From: David Pedersen Date: Tue, 16 Mar 2021 12:18:06 +0100 Subject: [PATCH 13/19] Mention drop as a way of shutting down the runtime --- tokio/src/runtime/handle.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index f44698ed19d..42bc164a67e 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -215,10 +215,10 @@ impl Handle { /// /// # If the runtime has been shut down /// - /// If the `Handle`'s associated `Runtime` has been shut down, through - /// [`Runtime::shutdown_background`] or [`Runtime::shutdown_timeout`], and - /// `Handle::block_on` is used it might return an error or panic. The exact - /// behavior depends on the types of futures used. + /// If the `Handle`'s associated `Runtime` has been shut down (through + /// [`Runtime::shutdown_background`], [`Runtime::shutdown_timeout`], or by + /// dropping it) and `Handle::block_on` is used it might return an error or + /// panic. The exact behavior depends on the types of futures used. /// /// ## Runtime independent futures /// From 1c3f43dcfa27a75029e2257d1b4a4253cacb135b Mon Sep 17 00:00:00 2001 From: David Pedersen Date: Tue, 16 Mar 2021 12:32:35 +0100 Subject: [PATCH 14/19] Adjust docs --- tokio/src/runtime/handle.rs | 34 ++-------------------------------- 1 file changed, 2 insertions(+), 32 deletions(-) diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 42bc164a67e..4cd72339b6b 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -218,38 +218,8 @@ impl Handle { /// If the `Handle`'s associated `Runtime` has been shut down (through /// [`Runtime::shutdown_background`], [`Runtime::shutdown_timeout`], or by /// dropping it) and `Handle::block_on` is used it might return an error or - /// panic. The exact behavior depends on the types of futures used. - /// - /// ## Runtime independent futures - /// - /// Runtime independent futures will run as normal. They are not affected by - /// shutdown. This includes, but is not limited to, channels, signals, and - /// basic futures that don't actually `await` anything. - /// - /// ## [`spawn_blocking`] futures - /// - /// Futures created with [`spawn_blocking`] will run if they were started - /// before the runtime was shut down. If they were created after the runtime - /// was shut down they will get cancelled and the [`JoinHandle`] will return - /// a [`JoinError`]. - /// - /// ## File system futures - /// - /// File system futures created by something in [`tokio::fs`] behave - /// similarly to [`spawn_blocking`] futures. They will run if started before - /// shutdown but fail with an error if started after shutdown. - /// - /// ## I/O future - /// - /// I/O futures created by something in [`tokio::net`] will return an error - /// regardless if the runtime was shut down before or after the future was - /// created. - /// - /// ## Timer futures - /// - /// Timer futures created by something in [`tokio::time`] will panic if the - /// runtime has been shut down. This is because the function signatures don't - /// allow returning errors. + /// panic. Specifically IO resources will return an error and timers will + /// panic. Runtime independent futures will run as normal. /// /// # Panics /// From 6d4fa2c27f9e11a2eedf293b74f4ab44a9c7f5b2 Mon Sep 17 00:00:00 2001 From: David Pedersen Date: Tue, 16 Mar 2021 21:04:01 +0100 Subject: [PATCH 15/19] Remove `_` prefix --- tokio/src/runtime/handle.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 4cd72339b6b..65289c71c85 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -271,10 +271,10 @@ impl Handle { let _rt_enter = self.enter(); // Enter a **blocking** context. This prevents blocking from a runtime. - let mut _blocking_enter = crate::runtime::enter(true); + let mut blocking_enter = crate::runtime::enter(true); // Block on the future - _blocking_enter + blocking_enter .block_on(future) .expect("failed to park thread") } From 4a5bdfa8084476d8c8d3a69bb6275c02ec32b40d Mon Sep 17 00:00:00 2001 From: David Pedersen Date: Tue, 16 Mar 2021 21:24:07 +0100 Subject: [PATCH 16/19] Fix docs --- tokio/src/runtime/handle.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 65289c71c85..a42528983ab 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -247,6 +247,8 @@ impl Handle { /// Or using `Handle::current`: /// /// ```no_run + /// use tokio::runtime::Handle; + /// /// #[tokio::main] /// async fn main () { /// // Get a handle to the current runtime and execute the future, blocking From 2372a40a29071b9493764328d899e63fd81a13a6 Mon Sep 17 00:00:00 2001 From: David Pedersen Date: Sat, 20 Mar 2021 09:29:22 +0100 Subject: [PATCH 17/19] Update tokio/src/runtime/handle.rs Co-authored-by: Carl Lerche --- tokio/src/runtime/handle.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index a42528983ab..d01581a2c3e 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -209,9 +209,12 @@ impl Handle { /// complete, and yielding its resolved result. Any tasks or timers which /// the future spawns internally will be executed on the runtime. /// - /// The behavior for multi threaded vs current thread schedulers is the same - /// as [`Runtime::block_on`]. See the docs of [`Runtime::block_on`] for more - /// details. + /// When this is used on a `current_thread` runtime, only the + /// [`Runtime::block_on`] method can drive the IO and timer drivers, but the + /// `Handle::block_on` method cannot drive them. This means that, when using + /// this method on a current_thread runtime, anything that relies on IO or + /// timers will not work unless there is another thread currently calling + /// [`Runtime::block_on`] on the same runtime. /// /// # If the runtime has been shut down /// From c83fa53f24ec7728370e865ae34f8e2cd98de3ed Mon Sep 17 00:00:00 2001 From: David Pedersen Date: Sat, 20 Mar 2021 10:10:55 +0100 Subject: [PATCH 18/19] Update tokio/src/runtime/handle.rs Co-authored-by: Alice Ryhl --- tokio/src/runtime/handle.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index d01581a2c3e..ce721aed652 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -254,10 +254,12 @@ impl Handle { /// /// #[tokio::main] /// async fn main () { - /// // Get a handle to the current runtime and execute the future, blocking - /// // the current thread until completion - /// Handle::current().block_on(async { - /// println!("hello"); + /// let handle = Handle::current(); + /// std::thread::spawn(move || { + /// // Using Handle::block_on to run async code in the new thread. + /// handle.block_on(async { + /// println!("hello"); + /// }); /// }); /// } /// ``` From 2e664735c0008dfa1c4dc59c87c522184c47f456 Mon Sep 17 00:00:00 2001 From: David Pedersen Date: Sat, 20 Mar 2021 10:14:27 +0100 Subject: [PATCH 19/19] Make `Handle::block_on` examples run --- tokio/src/runtime/handle.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index ce721aed652..9f1a4589091 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -232,7 +232,7 @@ impl Handle { /// /// # Examples /// - /// ```no_run + /// ``` /// use tokio::runtime::Runtime; /// /// // Create the runtime @@ -249,7 +249,7 @@ impl Handle { /// /// Or using `Handle::current`: /// - /// ```no_run + /// ``` /// use tokio::runtime::Handle; /// /// #[tokio::main]