Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

util: Add TokioContext future #2791

Merged
merged 18 commits into from Aug 27, 2020
Merged
83 changes: 83 additions & 0 deletions tokio-util/src/context.rs
@@ -0,0 +1,83 @@
//! This module introduces `TokioContext`, which enables a simple way of using
blasrodri marked this conversation as resolved.
Show resolved Hide resolved
//! the tokio runtime with any other executor.
LucioFranco marked this conversation as resolved.
Show resolved Hide resolved

use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tokio::runtime::Handle;

pin_project! {
/// TokioContext allows connecting a custom executor with the tokio runtime.
LucioFranco marked this conversation as resolved.
Show resolved Hide resolved
///
/// It contains a `Handle` to the runtime. A handle to the runtime can be
/// obtain by calling the `Runtime::handle()` method.
pub struct TokioContext<F> {
#[pin]
inner: F,
handle: Handle,
}
}

impl<F: Future> Future for TokioContext<F> {
type Output = F::Output;
blasrodri marked this conversation as resolved.
Show resolved Hide resolved
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.project();
let handle = me.handle;
let fut = me.inner;

handle.enter(|| fut.poll(cx))
}
}
blasrodri marked this conversation as resolved.
Show resolved Hide resolved
/// Trait extension that simplifies bundling a `Handle` with a `Future`.
pub trait HandleExt: Into<Handle> + Clone {
blasrodri marked this conversation as resolved.
Show resolved Hide resolved
/// Convenience method that takes a Future and returns a `TokioContext`.
///
/// # Example: calling Tokio Runtime from a custom ThreadPool
///
LucioFranco marked this conversation as resolved.
Show resolved Hide resolved
/// use std::futures::{self, Future};
blasrodri marked this conversation as resolved.
Show resolved Hide resolved
/// use tokio-utils::context::HandleExt;
/// use tokio::runtime::{Builder, Handle, Runtime};
/// use tokio::time::{delay_for, Duration};
///
/// struct ThreadPool {
/// pub inner: futures::executor::ThreadPool,
/// pub rt: Runtime,
/// }
///
/// impl ThreadPool {
/// fn spawn(&self, f: impl Future<Output = ()> + Send + 'static) {
/// let handle = self.rt.handle().clone();
/// // create a TokioContext from the handle and future.
/// let h = handle.wrap(f);
/// self.inner.spawn_ok(h);
/// }
/// }
///
/// let rt = tokio::runtime::Builder::new()
/// .threaded_scheduler()
/// .enable_all()
/// .build.unwrap();
/// let inner = futures::executor::ThreadPool::builder().create().unwrap();
/// let executor: ThreadPool {
/// inner, rt
/// }
///
/// executor.spawn(async {
/// delay_for(Duration::from_millis(2)).await
/// });

blasrodri marked this conversation as resolved.
Show resolved Hide resolved
fn wrap<F>(&self, fut: F) -> TokioContext<F>
where
F: Future,
{
TokioContext {
inner: fut,
handle: (*self).clone().into(),
}
}
}

impl HandleExt for Handle {}
2 changes: 2 additions & 0 deletions tokio-util/src/lib.rs
Expand Up @@ -38,4 +38,6 @@ cfg_compat! {
pub mod compat;
}

pub mod context;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hawkw do you think we should feature flag this around rt?


pub mod sync;
27 changes: 27 additions & 0 deletions tokio-util/tests/context.rs
@@ -0,0 +1,27 @@
#![warn(rust_2018_idioms)]

use tokio::runtime::Builder;
use tokio::time::*;
use tokio_util::context::HandleExt;
blasrodri marked this conversation as resolved.
Show resolved Hide resolved
#[test]
fn tokio_context_with_another_runtime() {
let mut rt1 = Builder::new()
.threaded_scheduler()
.core_threads(1)
// no timer!
.build()
.unwrap();
let rt2 = Builder::new()
.threaded_scheduler()
.core_threads(1)
.enable_all()
.build()
.unwrap();

// Without the `HandleExt.wrap()` there would be a panic because there is
// no timer running, since it would be referencing runtime r1.
let _ = rt1.block_on(
rt2.handle()
.wrap(async move { delay_for(Duration::from_millis(2)).await }),
);
}