From 85d8029e9bf2982b61b7e7b9b882f17492f35cfc Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Wed, 14 Oct 2020 19:36:24 -0400 Subject: [PATCH] util: Add `TokioContext` future (#2791) (#2958) Co-authored-by: Lucio Franco Co-authored-by: Blas Rodriguez Irizar --- tokio-macros/src/entry.rs | 2 + tokio-util/Cargo.toml | 1 + tokio-util/src/cfg.rs | 10 ++++ tokio-util/src/context.rs | 78 ++++++++++++++++++++++++++++++++ tokio-util/src/lib.rs | 4 ++ tokio-util/tests/context.rs | 29 ++++++++++++ tokio/src/lib.rs | 4 +- tokio/tests/macros_select.rs | 3 +- tokio/tests/rt_common.rs | 2 +- tokio/tests/stream_stream_map.rs | 10 ++-- tokio/tests/sync_broadcast.rs | 2 +- tokio/tests/time_delay_queue.rs | 2 +- 12 files changed, 138 insertions(+), 9 deletions(-) create mode 100644 tokio-util/src/context.rs create mode 100644 tokio-util/tests/context.rs diff --git a/tokio-macros/src/entry.rs b/tokio-macros/src/entry.rs index 2681f50d9c0..a15dfe6bf49 100644 --- a/tokio-macros/src/entry.rs +++ b/tokio-macros/src/entry.rs @@ -1,3 +1,5 @@ +#![allow(clippy::unnecessary_lazy_evaluations)] + use proc_macro::TokenStream; use quote::quote; use std::num::NonZeroUsize; diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index b79bc017783..75db138b9e1 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -29,6 +29,7 @@ full = ["codec", "udp", "compat"] compat = ["futures-io",] codec = ["tokio/stream"] udp = ["tokio/udp"] +rt = ["tokio/rt-core"] [dependencies] tokio = { version = "0.2.5", path = "../tokio" } diff --git a/tokio-util/src/cfg.rs b/tokio-util/src/cfg.rs index 27e8c66a433..ce2f71a5f08 100644 --- a/tokio-util/src/cfg.rs +++ b/tokio-util/src/cfg.rs @@ -18,6 +18,16 @@ macro_rules! cfg_compat { } } +macro_rules! cfg_rt { + ($($item:item)*) => { + $( + #[cfg(feature = "rt")] + #[cfg_attr(docsrs, doc(cfg(feature = "rt")))] + $item + )* + } +} + macro_rules! cfg_udp { ($($item:item)*) => { $( diff --git a/tokio-util/src/context.rs b/tokio-util/src/context.rs new file mode 100644 index 00000000000..f6289093299 --- /dev/null +++ b/tokio-util/src/context.rs @@ -0,0 +1,78 @@ +//! Tokio context aware futures utilities. +//! +//! This module includes utilities around integrating tokio with other runtimes +//! by allowing the context to be attached to futures. This allows spawning +//! futures on other executors while still using tokio to drive them. This +//! can be useful if you need to use a tokio based library in an executor/runtime +//! that does not provide a tokio context. + +use pin_project_lite::pin_project; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use tokio::runtime::Handle; + +pin_project! { + /// `TokioContext` allows connecting a custom executor with the tokio runtime. + /// + /// It contains a `Handle` to the runtime. A handle to the runtime can be + /// obtain by calling the `Runtime::handle()` method. + pub struct TokioContext { + #[pin] + inner: F, + handle: Handle, + } +} + +impl Future for TokioContext { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let me = self.project(); + let handle = me.handle; + let fut = me.inner; + + handle.enter(|| fut.poll(cx)) + } +} + +/// Trait extension that simplifies bundling a `Handle` with a `Future`. +pub trait HandleExt { + /// Convenience method that takes a Future and returns a `TokioContext`. + /// + /// # Example: calling Tokio Runtime from a custom ThreadPool + /// + /// ```no_run + /// use tokio_util::context::HandleExt; + /// use tokio::time::{delay_for, Duration}; + /// + /// let mut rt = tokio::runtime::Builder::new() + /// .threaded_scheduler() + /// .enable_all() + /// .build().unwrap(); + /// + /// let rt2 = tokio::runtime::Builder::new() + /// .threaded_scheduler() + /// .build().unwrap(); + /// + /// let fut = delay_for(Duration::from_millis(2)); + /// + /// rt.block_on( + /// rt2 + /// .handle() + /// .wrap(async { delay_for(Duration::from_millis(2)).await }), + /// ); + ///``` + fn wrap(&self, fut: F) -> TokioContext; +} + +impl HandleExt for Handle { + fn wrap(&self, fut: F) -> TokioContext { + TokioContext { + inner: fut, + handle: self.clone(), + } + } +} diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs index e888e3e3e3b..32c416b3506 100644 --- a/tokio-util/src/lib.rs +++ b/tokio-util/src/lib.rs @@ -35,3 +35,7 @@ cfg_udp! { cfg_compat! { pub mod compat; } + +cfg_rt! { + pub mod context; +} diff --git a/tokio-util/tests/context.rs b/tokio-util/tests/context.rs new file mode 100644 index 00000000000..b936b2e44e2 --- /dev/null +++ b/tokio-util/tests/context.rs @@ -0,0 +1,29 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "rt")] + +use tokio::runtime::Builder; +use tokio::time::*; +use tokio_util::context::HandleExt; + +#[test] +fn tokio_context_with_another_runtime() { + let mut rt1 = Builder::new() + .threaded_scheduler() + .core_threads(1) + // no timer! + .build() + .unwrap(); + let rt2 = Builder::new() + .threaded_scheduler() + .core_threads(1) + .enable_all() + .build() + .unwrap(); + + // Without the `HandleExt.wrap()` there would be a panic because there is + // no timer running, since it would be referencing runtime r1. + let _ = rt1.block_on( + rt2.handle() + .wrap(async move { delay_for(Duration::from_millis(2)).await }), + ); +} diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index 11a11195094..faddb991cf6 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -2,7 +2,9 @@ #![allow( clippy::cognitive_complexity, clippy::large_enum_variant, - clippy::needless_doctest_main + clippy::needless_doctest_main, + clippy::match_like_matches_macro, + clippy::stable_sort_primitive )] #![warn( missing_debug_implementations, diff --git a/tokio/tests/macros_select.rs b/tokio/tests/macros_select.rs index 2ebc4efc1ac..f71fd5f0794 100644 --- a/tokio/tests/macros_select.rs +++ b/tokio/tests/macros_select.rs @@ -1,4 +1,5 @@ -#![allow(clippy::blacklisted_name)] +#![allow(clippy::blacklisted_name, clippy::stable_sort_primitive)] + use tokio::sync::{mpsc, oneshot}; use tokio::task; use tokio_test::{assert_ok, assert_pending, assert_ready}; diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs index 4211a66705e..c3a60989401 100644 --- a/tokio/tests/rt_common.rs +++ b/tokio/tests/rt_common.rs @@ -1,4 +1,4 @@ -#![allow(clippy::needless_range_loop)] +#![allow(clippy::needless_range_loop, clippy::stable_sort_primitive)] #![warn(rust_2018_idioms)] #![cfg(feature = "full")] diff --git a/tokio/tests/stream_stream_map.rs b/tokio/tests/stream_stream_map.rs index 6b49803234c..ccfae00d2a1 100644 --- a/tokio/tests/stream_stream_map.rs +++ b/tokio/tests/stream_stream_map.rs @@ -1,3 +1,5 @@ +#![allow(clippy::stable_sort_primitive)] + use tokio::stream::{self, pending, Stream, StreamExt, StreamMap}; use tokio::sync::mpsc; use tokio_test::{assert_ok, assert_pending, assert_ready, task}; @@ -213,8 +215,8 @@ fn new_capacity_zero() { let map = StreamMap::<&str, stream::Pending<()>>::new(); assert_eq!(0, map.capacity()); - let keys = map.keys().collect::>(); - assert!(keys.is_empty()); + let mut keys = map.keys(); + assert!(keys.next().is_none()); } #[test] @@ -222,8 +224,8 @@ fn with_capacity() { let map = StreamMap::<&str, stream::Pending<()>>::with_capacity(10); assert!(10 <= map.capacity()); - let keys = map.keys().collect::>(); - assert!(keys.is_empty()); + let mut keys = map.keys(); + assert!(keys.next().is_none()); } #[test] diff --git a/tokio/tests/sync_broadcast.rs b/tokio/tests/sync_broadcast.rs index e37695b37d9..6f89c7a024c 100644 --- a/tokio/tests/sync_broadcast.rs +++ b/tokio/tests/sync_broadcast.rs @@ -1,4 +1,4 @@ -#![allow(clippy::cognitive_complexity)] +#![allow(clippy::cognitive_complexity, clippy::match_like_matches_macro)] #![warn(rust_2018_idioms)] #![cfg(feature = "sync")] diff --git a/tokio/tests/time_delay_queue.rs b/tokio/tests/time_delay_queue.rs index ba3c985815f..3fd82eb3237 100644 --- a/tokio/tests/time_delay_queue.rs +++ b/tokio/tests/time_delay_queue.rs @@ -1,4 +1,4 @@ -#![allow(clippy::blacklisted_name)] +#![allow(clippy::blacklisted_name, clippy::stable_sort_primitive)] #![warn(rust_2018_idioms)] #![cfg(feature = "full")]