Skip to content

Commit

Permalink
runtime: add Handle::block_on (#3569)
Browse files Browse the repository at this point in the history
Add `runtime::Handle::block_on`. The function enters the runtime context and
blocks the current thread while the future executes.

Refs: #3097
Fixes #2965, #3096
  • Loading branch information
davidpdrsn committed Mar 20, 2021
1 parent e4f7668 commit c39d986
Show file tree
Hide file tree
Showing 9 changed files with 675 additions and 44 deletions.
5 changes: 4 additions & 1 deletion tokio/src/io/driver/registration.rs
Expand Up @@ -235,7 +235,10 @@ cfg_io_readiness! {

crate::future::poll_fn(|cx| {
if self.handle.inner().is_none() {
return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "reactor gone")));
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::Other,
crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR
)));
}

Pin::new(&mut fut).poll(cx).map(Ok)
Expand Down
87 changes: 87 additions & 0 deletions tokio/src/runtime/handle.rs
Expand Up @@ -202,6 +202,93 @@ impl Handle {
let _ = self.blocking_spawner.spawn(task, &self);
handle
}

/// Run a future to completion on this `Handle`'s associated `Runtime`.
///
/// This runs the given future on the runtime, blocking until it is
/// complete, and yielding its resolved result. Any tasks or timers which
/// the future spawns internally will be executed on the runtime.
///
/// When this is used on a `current_thread` runtime, only the
/// [`Runtime::block_on`] method can drive the IO and timer drivers, but the
/// `Handle::block_on` method cannot drive them. This means that, when using
/// this method on a current_thread runtime, anything that relies on IO or
/// timers will not work unless there is another thread currently calling
/// [`Runtime::block_on`] on the same runtime.
///
/// # If the runtime has been shut down
///
/// If the `Handle`'s associated `Runtime` has been shut down (through
/// [`Runtime::shutdown_background`], [`Runtime::shutdown_timeout`], or by
/// dropping it) and `Handle::block_on` is used it might return an error or
/// panic. Specifically IO resources will return an error and timers will
/// panic. Runtime independent futures will run as normal.
///
/// # Panics
///
/// This function panics if the provided future panics, if called within an
/// asynchronous execution context, or if a timer future is executed on a
/// runtime that has been shut down.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Runtime;
///
/// // Create the runtime
/// let rt = Runtime::new().unwrap();
///
/// // Get a handle from this runtime
/// let handle = rt.handle();
///
/// // Execute the future, blocking the current thread until completion
/// handle.block_on(async {
/// println!("hello");
/// });
/// ```
///
/// Or using `Handle::current`:
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main () {
/// let handle = Handle::current();
/// std::thread::spawn(move || {
/// // Using Handle::block_on to run async code in the new thread.
/// handle.block_on(async {
/// println!("hello");
/// });
/// });
/// }
/// ```
///
/// [`JoinError`]: struct@crate::task::JoinError
/// [`JoinHandle`]: struct@crate::task::JoinHandle
/// [`Runtime::block_on`]: fn@crate::runtime::Runtime::block_on
/// [`Runtime::shutdown_background`]: fn@crate::runtime::Runtime::shutdown_background
/// [`Runtime::shutdown_timeout`]: fn@crate::runtime::Runtime::shutdown_timeout
/// [`spawn_blocking`]: crate::task::spawn_blocking
/// [`tokio::fs`]: crate::fs
/// [`tokio::net`]: crate::net
/// [`tokio::time`]: crate::time
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
// Enter the **runtime** context. This configures spawning, the current I/O driver, ...
let _rt_enter = self.enter();

// Enter a **blocking** context. This prevents blocking from a runtime.
let mut blocking_enter = crate::runtime::enter(true);

// Block on the future
blocking_enter
.block_on(future)
.expect("failed to park thread")
}

pub(crate) fn shutdown(mut self) {
self.spawner.shutdown();
}
}

/// Error returned by `try_current` when no Runtime has been started
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/mod.rs
Expand Up @@ -526,7 +526,7 @@ cfg_rt! {
/// ```
pub fn shutdown_timeout(mut self, duration: Duration) {
// Wakeup and shutdown all the worker threads
self.handle.spawner.shutdown();
self.handle.shutdown();
self.blocking_pool.shutdown(Some(duration));
}

Expand Down
4 changes: 4 additions & 0 deletions tokio/src/time/driver/entry.rs
Expand Up @@ -543,6 +543,10 @@ impl TimerEntry {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), super::Error>> {
if self.driver.is_shutdown() {
panic!(crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR);
}

if let Some(deadline) = self.initial_deadline {
self.as_mut().reset(deadline);
}
Expand Down
19 changes: 12 additions & 7 deletions tokio/src/time/driver/handle.rs
@@ -1,18 +1,18 @@
use crate::loom::sync::{Arc, Mutex};
use crate::loom::sync::Arc;
use crate::time::driver::ClockTime;
use std::fmt;

/// Handle to time driver instance.
#[derive(Clone)]
pub(crate) struct Handle {
time_source: ClockTime,
inner: Arc<Mutex<super::Inner>>,
inner: Arc<super::Inner>,
}

impl Handle {
/// Creates a new timer `Handle` from a shared `Inner` timer state.
pub(super) fn new(inner: Arc<Mutex<super::Inner>>) -> Self {
let time_source = inner.lock().time_source.clone();
pub(super) fn new(inner: Arc<super::Inner>) -> Self {
let time_source = inner.state.lock().time_source.clone();
Handle { time_source, inner }
}

Expand All @@ -21,9 +21,14 @@ impl Handle {
&self.time_source
}

/// Locks the driver's inner structure
pub(super) fn lock(&self) -> crate::loom::sync::MutexGuard<'_, super::Inner> {
self.inner.lock()
/// Access the driver's inner structure
pub(super) fn get(&self) -> &super::Inner {
&*self.inner
}

// Check whether the driver has been shutdown
pub(super) fn is_shutdown(&self) -> bool {
self.inner.is_shutdown()
}
}

Expand Down
71 changes: 43 additions & 28 deletions tokio/src/time/driver/mod.rs
Expand Up @@ -16,6 +16,7 @@ mod wheel;

pub(super) mod sleep;

use crate::loom::sync::atomic::{AtomicBool, Ordering};
use crate::loom::sync::{Arc, Mutex};
use crate::park::{Park, Unpark};
use crate::time::error::Error;
Expand Down Expand Up @@ -86,7 +87,7 @@ pub(crate) struct Driver<P: Park + 'static> {
time_source: ClockTime,

/// Shared state
inner: Handle,
handle: Handle,

/// Parker to delegate to
park: P,
Expand Down Expand Up @@ -132,7 +133,16 @@ impl ClockTime {
}

/// Timer state shared between `Driver`, `Handle`, and `Registration`.
pub(self) struct Inner {
struct Inner {
// The state is split like this so `Handle` can access `is_shutdown` without locking the mutex
pub(super) state: Mutex<InnerState>,

/// True if the driver is being shutdown
pub(super) is_shutdown: AtomicBool,
}

/// Time state shared which must be protected by a `Mutex`
struct InnerState {
/// Timing backend in use
time_source: ClockTime,

Expand All @@ -145,9 +155,6 @@ pub(self) struct Inner {
/// Timer wheel
wheel: wheel::Wheel,

/// True if the driver is being shutdown
is_shutdown: bool,

/// Unparker that can be used to wake the time driver
unpark: Box<dyn Unpark>,
}
Expand All @@ -169,7 +176,7 @@ where

Driver {
time_source,
inner: Handle::new(Arc::new(Mutex::new(inner))),
handle: Handle::new(Arc::new(inner)),
park,
}
}
Expand All @@ -181,15 +188,15 @@ where
/// `with_default`, setting the timer as the default timer for the execution
/// context.
pub(crate) fn handle(&self) -> Handle {
self.inner.clone()
self.handle.clone()
}

fn park_internal(&mut self, limit: Option<Duration>) -> Result<(), P::Error> {
let clock = &self.time_source.clock;

let mut lock = self.inner.lock();
let mut lock = self.handle.get().state.lock();

assert!(!lock.is_shutdown);
assert!(!self.handle.is_shutdown());

let next_wake = lock.wheel.next_expiration_time();
lock.next_wake =
Expand Down Expand Up @@ -237,7 +244,7 @@ where
}

// Process pending timers after waking up
self.inner.process();
self.handle.process();

Ok(())
}
Expand All @@ -255,7 +262,7 @@ impl Handle {
let mut waker_list: [Option<Waker>; 32] = Default::default();
let mut waker_idx = 0;

let mut lock = self.lock();
let mut lock = self.get().lock();

assert!(now >= lock.elapsed);

Expand All @@ -278,7 +285,7 @@ impl Handle {

waker_idx = 0;

lock = self.lock();
lock = self.get().lock();
}
}
}
Expand Down Expand Up @@ -309,7 +316,7 @@ impl Handle {
/// `add_entry` must not be called concurrently.
pub(self) unsafe fn clear_entry(&self, entry: NonNull<TimerShared>) {
unsafe {
let mut lock = self.lock();
let mut lock = self.get().lock();

if entry.as_ref().might_be_registered() {
lock.wheel.remove(entry);
Expand All @@ -327,7 +334,7 @@ impl Handle {
/// the `TimerEntry`)
pub(self) unsafe fn reregister(&self, new_tick: u64, entry: NonNull<TimerShared>) {
let waker = unsafe {
let mut lock = self.lock();
let mut lock = self.get().lock();

// We may have raced with a firing/deregistration, so check before
// deregistering.
Expand All @@ -338,7 +345,7 @@ impl Handle {
// Now that we have exclusive control of this entry, mint a handle to reinsert it.
let entry = entry.as_ref().handle();

if lock.is_shutdown {
if self.is_shutdown() {
unsafe { entry.fire(Err(crate::time::error::Error::shutdown())) }
} else {
entry.set_expiration(new_tick);
Expand Down Expand Up @@ -396,19 +403,15 @@ where
}

fn shutdown(&mut self) {
let mut lock = self.inner.lock();

if lock.is_shutdown {
if self.handle.is_shutdown() {
return;
}

lock.is_shutdown = true;

drop(lock);
self.handle.get().is_shutdown.store(true, Ordering::SeqCst);

// Advance time forward to the end of time.

self.inner.process_at_time(u64::MAX);
self.handle.process_at_time(u64::MAX);

self.park.shutdown();
}
Expand All @@ -428,14 +431,26 @@ where
impl Inner {
pub(self) fn new(time_source: ClockTime, unpark: Box<dyn Unpark>) -> Self {
Inner {
time_source,
elapsed: 0,
next_wake: None,
unpark,
wheel: wheel::Wheel::new(),
is_shutdown: false,
state: Mutex::new(InnerState {
time_source,
elapsed: 0,
next_wake: None,
unpark,
wheel: wheel::Wheel::new(),
}),
is_shutdown: AtomicBool::new(false),
}
}

/// Locks the driver's inner structure
pub(super) fn lock(&self) -> crate::loom::sync::MutexGuard<'_, InnerState> {
self.state.lock()
}

// Check whether the driver has been shutdown
pub(super) fn is_shutdown(&self) -> bool {
self.is_shutdown.load(Ordering::SeqCst)
}
}

impl fmt::Debug for Inner {
Expand Down

0 comments on commit c39d986

Please sign in to comment.