Skip to content

Commit

Permalink
v0.1.x: allow overriding blocking behavior (#1752)
Browse files Browse the repository at this point in the history
## Motivation

The initial version of `tokio-compat`'s compatibility runtime added in
#1663 doesn't support the calls to `tokio_threadpool` 0.1's `blocking`.
This is because (unlike the timer, executor, and reactor), there's no
way to override the global `blocking` functionality in
`tokio-threadpool`.

## Solution

As discussed [here][1], this branch adds APIs to the v0.1.x version of
`tokio-threadpool` that allow overriding the behavior used by calls to
`blocking`. The threadpool crate now exposes `blocking::set_default` and
`blocking::with_default` functions, like `executor`, `timer`, and
`reactor`. This will allow `tokio-compat` to override calls to 0.1's
`blocking` to use the new `tokio` 0.2 blocking APIs.

Unlike the similar APIs in `executor`, `timer`, and `reactor`, the hooks
for overriding blocking behaviour are `#[doc(hidden)]` and have comments
warning against their use outside of `tokio-compat`. In general, there 
probably won't be a compelling reason to override these outside of the 
compatibility layer.

Refs: #1722

[1]: #1663 (comment)

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
hawkw committed Nov 12, 2019
1 parent 22b7bd2 commit 9e91b8d
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 52 deletions.
@@ -1,14 +1,55 @@
use worker::Worker;

use super::{BlockingError, BlockingImpl};
use futures::Poll;
use tokio_executor;

use std::error::Error;
use std::cell::Cell;
use std::fmt;
use std::marker::PhantomData;
use tokio_executor::Enter;

thread_local! {
static CURRENT: Cell<BlockingImpl> = Cell::new(super::default_blocking);
}

/// Error raised by `blocking`.
pub struct BlockingError {
_p: (),
/// Ensures that the executor is removed from the thread-local context
/// when leaving the scope. This handles cases that involve panicking.
///
/// **NOTE:** This is intended specifically for use by `tokio` 0.2's
/// backwards-compatibility layer. In general, user code should not override the
/// blocking implementation. If you use this, make sure you know what you're
/// doing.
pub struct DefaultGuard<'a> {
prior: BlockingImpl,
_lifetime: PhantomData<&'a ()>,
}

/// Set the default blocking implementation, returning a guard that resets the
/// blocking implementation when dropped.
///
/// **NOTE:** This is intended specifically for use by `tokio` 0.2's
/// backwards-compatibility layer. In general, user code should not override the
/// blocking implementation. If you use this, make sure you know what you're
/// doing.
pub fn set_default<'a>(blocking: BlockingImpl) -> DefaultGuard<'a> {
CURRENT.with(|cell| {
let prior = cell.replace(blocking);
DefaultGuard {
prior,
_lifetime: PhantomData,
}
})
}

/// Set the default blocking implementation for the duration of the closure.
///
/// **NOTE:** This is intended specifically for use by `tokio` 0.2's
/// backwards-compatibility layer. In general, user code should not override the
/// blocking implementation. If you use this, make sure you know what you're
/// doing.
pub fn with_default<F, R>(blocking: BlockingImpl, enter: &mut Enter, f: F) -> R
where
F: FnOnce(&mut Enter) -> R,
{
let _guard = set_default(blocking);
f(enter)
}

/// Enter a blocking section of code.
Expand Down Expand Up @@ -126,56 +167,52 @@ pub fn blocking<F, T>(f: F) -> Poll<T, BlockingError>
where
F: FnOnce() -> T,
{
let res = Worker::with_current(|worker| {
let worker = match worker {
Some(worker) => worker,
None => {
return Err(BlockingError { _p: () });
}
};

// Transition the worker state to blocking. This will exit the fn early
// with `NotReady` if the pool does not have enough capacity to enter
// blocking mode.
worker.transition_to_blocking()
});
CURRENT.with(|cell| {
let blocking = cell.get();

// If the transition cannot happen, exit early
try_ready!(res);
// Object-safety workaround: the `Blocking` trait must be object-safe,
// since we use a trait object in the thread-local. However, a blocking
// _operation_ will be generic over the return type of the blocking
// function. Therefore, rather than passing a function with a return
// type to `Blocking::run_blocking`, we pass a _new_ closure which
// doesn't have a return value. That closure invokes the blocking
// function and assigns its value to `ret`, which we then unpack when
// the blocking call finishes.
let mut f = Some(f);
let mut ret = None;
{
let ret2 = &mut ret;
let mut run = move || {
let f = f
.take()
.expect("blocking closure invoked twice; this is a bug!");
*ret2 = Some((f)());
};

// Currently in blocking mode, so call the inner closure.
//
// "Exit" the current executor in case the blocking function wants
// to call a different executor.
let ret = tokio_executor::exit(move || f());
try_ready!((blocking)(&mut run));
}

// Try to transition out of blocking mode. This is a fast path that takes
// back ownership of the worker if the worker handoff didn't complete yet.
Worker::with_current(|worker| {
// Worker must be set since it was above.
worker.unwrap().transition_from_blocking();
});

// Return the result
Ok(ret.into())
// Return the result
let ret =
ret.expect("blocking function finished, but return value was unset; this is a bug!");
Ok(ret.into())
})
}

impl fmt::Display for BlockingError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "{}", self.description())
}
}
// === impl DefaultGuard ===

impl fmt::Debug for BlockingError {
impl<'a> fmt::Debug for DefaultGuard<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("BlockingError")
.field("reason", &self.description())
.finish()
f.pad("DefaultGuard { .. }")
}
}

impl Error for BlockingError {
fn description(&self) -> &str {
"`blocking` annotation used from outside the context of a thread pool"
impl<'a> Drop for DefaultGuard<'a> {
fn drop(&mut self) {
// if the TLS value has already been torn down, there's nothing else we
// can do. we're almost certainly panicking anyway.
let _ = CURRENT.try_with(|cell| {
cell.set(self.prior);
});
}
}
88 changes: 88 additions & 0 deletions tokio-threadpool/src/blocking/mod.rs
@@ -0,0 +1,88 @@
use worker::Worker;

use futures::{Async, Poll};
use tokio_executor;

use std::error::Error;
use std::fmt;

mod global;
pub use self::global::blocking;
#[doc(hidden)]
pub use self::global::{set_default, with_default, DefaultGuard};

/// Error raised by `blocking`.
pub struct BlockingError {
_p: (),
}

/// A function implementing the behavior run on calls to `blocking`.
///
/// **NOTE:** This is intended specifically for use by `tokio` 0.2's
/// backwards-compatibility layer. In general, user code should not override the
/// blocking implementation. If you use this, make sure you know what you're
/// doing.
#[doc(hidden)]
pub type BlockingImpl = fn(&mut dyn FnMut()) -> Poll<(), BlockingError>;

fn default_blocking(f: &mut dyn FnMut()) -> Poll<(), BlockingError> {
let res = Worker::with_current(|worker| {
let worker = match worker {
Some(worker) => worker,
None => {
return Err(BlockingError::new());
}
};

// Transition the worker state to blocking. This will exit the fn early
// with `NotReady` if the pool does not have enough capacity to enter
// blocking mode.
worker.transition_to_blocking()
});

// If the transition cannot happen, exit early
try_ready!(res);

// Currently in blocking mode, so call the inner closure.
//
// "Exit" the current executor in case the blocking function wants
// to call a different executor.
tokio_executor::exit(move || (f)());

// Try to transition out of blocking mode. This is a fast path that takes
// back ownership of the worker if the worker handoff didn't complete yet.
Worker::with_current(|worker| {
// Worker must be set since it was above.
worker.unwrap().transition_from_blocking();
});

Ok(Async::Ready(()))
}

impl BlockingError {
/// Returns a new `BlockingError`.
#[doc(hidden)]
pub fn new() -> Self {
Self { _p: () }
}
}

impl fmt::Display for BlockingError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "{}", self.description())
}
}

impl fmt::Debug for BlockingError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("BlockingError")
.field("reason", &self.description())
.finish()
}
}

impl Error for BlockingError {
fn description(&self) -> &str {
"`blocking` annotation used from outside the context of a thread pool"
}
}
6 changes: 3 additions & 3 deletions tokio-threadpool/src/lib.rs
Expand Up @@ -142,13 +142,13 @@ extern crate log;
//
// [Treiber stack]: https://en.wikipedia.org/wiki/Treiber_Stack

pub mod park;

mod blocking;
#[doc(hidden)]
pub mod blocking;
mod builder;
mod callback;
mod config;
mod notifier;
pub mod park;
mod pool;
mod sender;
mod shutdown;
Expand Down

0 comments on commit 9e91b8d

Please sign in to comment.