From d9d909cb4c6d326423ee02fbcf6bbfe5553d2c0a Mon Sep 17 00:00:00 2001 From: Blas Rodriguez Irizar Date: Thu, 27 Aug 2020 17:33:43 +0200 Subject: [PATCH] util: Add `TokioContext` future (#2791) Co-authored-by: Lucio Franco --- tokio-util/src/context.rs | 78 +++++++++++++++++++++++++++++++++++++ tokio-util/src/lib.rs | 2 + tokio-util/tests/context.rs | 28 +++++++++++++ 3 files changed, 108 insertions(+) create mode 100644 tokio-util/src/context.rs create mode 100644 tokio-util/tests/context.rs diff --git a/tokio-util/src/context.rs b/tokio-util/src/context.rs new file mode 100644 index 00000000000..f6289093299 --- /dev/null +++ b/tokio-util/src/context.rs @@ -0,0 +1,78 @@ +//! Tokio context aware futures utilities. +//! +//! This module includes utilities around integrating tokio with other runtimes +//! by allowing the context to be attached to futures. This allows spawning +//! futures on other executors while still using tokio to drive them. This +//! can be useful if you need to use a tokio based library in an executor/runtime +//! that does not provide a tokio context. + +use pin_project_lite::pin_project; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use tokio::runtime::Handle; + +pin_project! { + /// `TokioContext` allows connecting a custom executor with the tokio runtime. + /// + /// It contains a `Handle` to the runtime. A handle to the runtime can be + /// obtain by calling the `Runtime::handle()` method. + pub struct TokioContext { + #[pin] + inner: F, + handle: Handle, + } +} + +impl Future for TokioContext { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let me = self.project(); + let handle = me.handle; + let fut = me.inner; + + handle.enter(|| fut.poll(cx)) + } +} + +/// Trait extension that simplifies bundling a `Handle` with a `Future`. +pub trait HandleExt { + /// Convenience method that takes a Future and returns a `TokioContext`. + /// + /// # Example: calling Tokio Runtime from a custom ThreadPool + /// + /// ```no_run + /// use tokio_util::context::HandleExt; + /// use tokio::time::{delay_for, Duration}; + /// + /// let mut rt = tokio::runtime::Builder::new() + /// .threaded_scheduler() + /// .enable_all() + /// .build().unwrap(); + /// + /// let rt2 = tokio::runtime::Builder::new() + /// .threaded_scheduler() + /// .build().unwrap(); + /// + /// let fut = delay_for(Duration::from_millis(2)); + /// + /// rt.block_on( + /// rt2 + /// .handle() + /// .wrap(async { delay_for(Duration::from_millis(2)).await }), + /// ); + ///``` + fn wrap(&self, fut: F) -> TokioContext; +} + +impl HandleExt for Handle { + fn wrap(&self, fut: F) -> TokioContext { + TokioContext { + inner: fut, + handle: self.clone(), + } + } +} diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs index ec69f59d04b..3e9a3b7e6db 100644 --- a/tokio-util/src/lib.rs +++ b/tokio-util/src/lib.rs @@ -38,4 +38,6 @@ cfg_compat! { pub mod compat; } +pub mod context; + pub mod sync; diff --git a/tokio-util/tests/context.rs b/tokio-util/tests/context.rs new file mode 100644 index 00000000000..49038ddbdb4 --- /dev/null +++ b/tokio-util/tests/context.rs @@ -0,0 +1,28 @@ +#![warn(rust_2018_idioms)] + +use tokio::runtime::Builder; +use tokio::time::*; +use tokio_util::context::HandleExt; + +#[test] +fn tokio_context_with_another_runtime() { + let mut rt1 = Builder::new() + .threaded_scheduler() + .core_threads(1) + // no timer! + .build() + .unwrap(); + let rt2 = Builder::new() + .threaded_scheduler() + .core_threads(1) + .enable_all() + .build() + .unwrap(); + + // Without the `HandleExt.wrap()` there would be a panic because there is + // no timer running, since it would be referencing runtime r1. + let _ = rt1.block_on( + rt2.handle() + .wrap(async move { delay_for(Duration::from_millis(2)).await }), + ); +}