Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: add Handle::block_on #3569

Merged
merged 19 commits into from Mar 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion tokio/src/io/driver/registration.rs
Expand Up @@ -235,7 +235,10 @@ cfg_io_readiness! {

crate::future::poll_fn(|cx| {
if self.handle.inner().is_none() {
return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "reactor gone")));
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::Other,
crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR
)));
}

Pin::new(&mut fut).poll(cx).map(Ok)
Expand Down
87 changes: 87 additions & 0 deletions tokio/src/runtime/handle.rs
Expand Up @@ -202,6 +202,93 @@ impl Handle {
let _ = self.blocking_spawner.spawn(task, &self);
handle
}

/// Run a future to completion on this `Handle`'s associated `Runtime`.
///
/// This runs the given future on the runtime, blocking until it is
/// complete, and yielding its resolved result. Any tasks or timers which
/// the future spawns internally will be executed on the runtime.
///
/// When this is used on a `current_thread` runtime, only the
/// [`Runtime::block_on`] method can drive the IO and timer drivers, but the
/// `Handle::block_on` method cannot drive them. This means that, when using
/// this method on a current_thread runtime, anything that relies on IO or
/// timers will not work unless there is another thread currently calling
/// [`Runtime::block_on`] on the same runtime.
///
/// # If the runtime has been shut down
///
/// If the `Handle`'s associated `Runtime` has been shut down (through
/// [`Runtime::shutdown_background`], [`Runtime::shutdown_timeout`], or by
/// dropping it) and `Handle::block_on` is used it might return an error or
/// panic. Specifically IO resources will return an error and timers will
/// panic. Runtime independent futures will run as normal.
///
/// # Panics
///
/// This function panics if the provided future panics, if called within an
/// asynchronous execution context, or if a timer future is executed on a
/// runtime that has been shut down.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Runtime;
///
/// // Create the runtime
/// let rt = Runtime::new().unwrap();
///
/// // Get a handle from this runtime
/// let handle = rt.handle();
///
/// // Execute the future, blocking the current thread until completion
/// handle.block_on(async {
/// println!("hello");
/// });
/// ```
///
/// Or using `Handle::current`:
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main () {
/// let handle = Handle::current();
/// std::thread::spawn(move || {
/// // Using Handle::block_on to run async code in the new thread.
/// handle.block_on(async {
/// println!("hello");
/// });
/// });
/// }
/// ```
///
/// [`JoinError`]: struct@crate::task::JoinError
/// [`JoinHandle`]: struct@crate::task::JoinHandle
/// [`Runtime::block_on`]: fn@crate::runtime::Runtime::block_on
/// [`Runtime::shutdown_background`]: fn@crate::runtime::Runtime::shutdown_background
/// [`Runtime::shutdown_timeout`]: fn@crate::runtime::Runtime::shutdown_timeout
/// [`spawn_blocking`]: crate::task::spawn_blocking
/// [`tokio::fs`]: crate::fs
/// [`tokio::net`]: crate::net
/// [`tokio::time`]: crate::time
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
// Enter the **runtime** context. This configures spawning, the current I/O driver, ...
let _rt_enter = self.enter();

// Enter a **blocking** context. This prevents blocking from a runtime.
let mut blocking_enter = crate::runtime::enter(true);

// Block on the future
blocking_enter
.block_on(future)
.expect("failed to park thread")
}

pub(crate) fn shutdown(mut self) {
self.spawner.shutdown();
}
}

/// Error returned by `try_current` when no Runtime has been started
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/mod.rs
Expand Up @@ -526,7 +526,7 @@ cfg_rt! {
/// ```
pub fn shutdown_timeout(mut self, duration: Duration) {
// Wakeup and shutdown all the worker threads
self.handle.spawner.shutdown();
self.handle.shutdown();
self.blocking_pool.shutdown(Some(duration));
}

Expand Down
4 changes: 4 additions & 0 deletions tokio/src/time/driver/entry.rs
Expand Up @@ -543,6 +543,10 @@ impl TimerEntry {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), super::Error>> {
if self.driver.is_shutdown() {
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
panic!(crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR);
}

if let Some(deadline) = self.initial_deadline {
self.as_mut().reset(deadline);
}
Expand Down
19 changes: 12 additions & 7 deletions tokio/src/time/driver/handle.rs
@@ -1,18 +1,18 @@
use crate::loom::sync::{Arc, Mutex};
use crate::loom::sync::Arc;
use crate::time::driver::ClockTime;
use std::fmt;

/// Handle to time driver instance.
#[derive(Clone)]
pub(crate) struct Handle {
time_source: ClockTime,
inner: Arc<Mutex<super::Inner>>,
inner: Arc<super::Inner>,
}

impl Handle {
/// Creates a new timer `Handle` from a shared `Inner` timer state.
pub(super) fn new(inner: Arc<Mutex<super::Inner>>) -> Self {
let time_source = inner.lock().time_source.clone();
pub(super) fn new(inner: Arc<super::Inner>) -> Self {
let time_source = inner.state.lock().time_source.clone();
Handle { time_source, inner }
}

Expand All @@ -21,9 +21,14 @@ impl Handle {
&self.time_source
}

/// Locks the driver's inner structure
pub(super) fn lock(&self) -> crate::loom::sync::MutexGuard<'_, super::Inner> {
self.inner.lock()
/// Access the driver's inner structure
pub(super) fn get(&self) -> &super::Inner {
&*self.inner
}

// Check whether the driver has been shutdown
pub(super) fn is_shutdown(&self) -> bool {
self.inner.is_shutdown()
}
}

Expand Down
71 changes: 43 additions & 28 deletions tokio/src/time/driver/mod.rs
Expand Up @@ -16,6 +16,7 @@ mod wheel;

pub(super) mod sleep;

use crate::loom::sync::atomic::{AtomicBool, Ordering};
use crate::loom::sync::{Arc, Mutex};
use crate::park::{Park, Unpark};
use crate::time::error::Error;
Expand Down Expand Up @@ -86,7 +87,7 @@ pub(crate) struct Driver<P: Park + 'static> {
time_source: ClockTime,

/// Shared state
inner: Handle,
handle: Handle,

/// Parker to delegate to
park: P,
Expand Down Expand Up @@ -132,7 +133,16 @@ impl ClockTime {
}

/// Timer state shared between `Driver`, `Handle`, and `Registration`.
pub(self) struct Inner {
struct Inner {
// The state is split like this so `Handle` can access `is_shutdown` without locking the mutex
pub(super) state: Mutex<InnerState>,

/// True if the driver is being shutdown
pub(super) is_shutdown: AtomicBool,
}

/// Time state shared which must be protected by a `Mutex`
struct InnerState {
/// Timing backend in use
time_source: ClockTime,

Expand All @@ -145,9 +155,6 @@ pub(self) struct Inner {
/// Timer wheel
wheel: wheel::Wheel,

/// True if the driver is being shutdown
is_shutdown: bool,

/// Unparker that can be used to wake the time driver
unpark: Box<dyn Unpark>,
}
Expand All @@ -169,7 +176,7 @@ where

Driver {
time_source,
inner: Handle::new(Arc::new(Mutex::new(inner))),
handle: Handle::new(Arc::new(inner)),
park,
}
}
Expand All @@ -181,15 +188,15 @@ where
/// `with_default`, setting the timer as the default timer for the execution
/// context.
pub(crate) fn handle(&self) -> Handle {
self.inner.clone()
self.handle.clone()
}

fn park_internal(&mut self, limit: Option<Duration>) -> Result<(), P::Error> {
let clock = &self.time_source.clock;

let mut lock = self.inner.lock();
let mut lock = self.handle.get().state.lock();

assert!(!lock.is_shutdown);
assert!(!self.handle.is_shutdown());

let next_wake = lock.wheel.next_expiration_time();
lock.next_wake =
Expand Down Expand Up @@ -237,7 +244,7 @@ where
}

// Process pending timers after waking up
self.inner.process();
self.handle.process();

Ok(())
}
Expand All @@ -255,7 +262,7 @@ impl Handle {
let mut waker_list: [Option<Waker>; 32] = Default::default();
let mut waker_idx = 0;

let mut lock = self.lock();
let mut lock = self.get().lock();

assert!(now >= lock.elapsed);

Expand All @@ -278,7 +285,7 @@ impl Handle {

waker_idx = 0;

lock = self.lock();
lock = self.get().lock();
}
}
}
Expand Down Expand Up @@ -309,7 +316,7 @@ impl Handle {
/// `add_entry` must not be called concurrently.
pub(self) unsafe fn clear_entry(&self, entry: NonNull<TimerShared>) {
unsafe {
let mut lock = self.lock();
let mut lock = self.get().lock();

if entry.as_ref().might_be_registered() {
lock.wheel.remove(entry);
Expand All @@ -327,7 +334,7 @@ impl Handle {
/// the `TimerEntry`)
pub(self) unsafe fn reregister(&self, new_tick: u64, entry: NonNull<TimerShared>) {
let waker = unsafe {
let mut lock = self.lock();
let mut lock = self.get().lock();

// We may have raced with a firing/deregistration, so check before
// deregistering.
Expand All @@ -338,7 +345,7 @@ impl Handle {
// Now that we have exclusive control of this entry, mint a handle to reinsert it.
let entry = entry.as_ref().handle();

if lock.is_shutdown {
if self.is_shutdown() {
unsafe { entry.fire(Err(crate::time::error::Error::shutdown())) }
} else {
entry.set_expiration(new_tick);
Expand Down Expand Up @@ -396,19 +403,15 @@ where
}

fn shutdown(&mut self) {
let mut lock = self.inner.lock();

if lock.is_shutdown {
if self.handle.is_shutdown() {
return;
}

lock.is_shutdown = true;

drop(lock);
self.handle.get().is_shutdown.store(true, Ordering::SeqCst);

// Advance time forward to the end of time.

self.inner.process_at_time(u64::MAX);
self.handle.process_at_time(u64::MAX);

self.park.shutdown();
}
Expand All @@ -428,14 +431,26 @@ where
impl Inner {
pub(self) fn new(time_source: ClockTime, unpark: Box<dyn Unpark>) -> Self {
Inner {
time_source,
elapsed: 0,
next_wake: None,
unpark,
wheel: wheel::Wheel::new(),
is_shutdown: false,
state: Mutex::new(InnerState {
time_source,
elapsed: 0,
next_wake: None,
unpark,
wheel: wheel::Wheel::new(),
}),
is_shutdown: AtomicBool::new(false),
}
}

/// Locks the driver's inner structure
pub(super) fn lock(&self) -> crate::loom::sync::MutexGuard<'_, InnerState> {
self.state.lock()
}

// Check whether the driver has been shutdown
pub(super) fn is_shutdown(&self) -> bool {
self.is_shutdown.load(Ordering::SeqCst)
}
}

impl fmt::Debug for Inner {
Expand Down